# 量化投资之多因子选股（一）：数据准备与单因子检验

## 矢量化选股回测概述

### 要点2：股票池

factor_df = factor_df.reindex_like(univ_a)*univ_a

### 要点3：剔除ST股、停盘股、涨跌停书评

forbid_days = suspend_valid*limit_valid 只有股票在当天同时通过三种筛选，其数值才为1

### 要点4: 仓位构建

step1:横截面排序，按照比例筛选股票，再进行仓位权重归一化，得到初始仓位pos_1
step2:若考虑调仓周期,假设是月频率调仓，就是隔20个交易日调仓。
pos_2 = pos_1.reindex(pos_1.index[::20]).fillna(0).reindex(pos_1.index).ffill()

step3:考虑调仓日不可交易股票（见to_final_position)

### 要点5：回测

shift()的原因是：rtn_df是当天收盘价（开盘价）比上前一天的收盘价（开盘价），而我们是当天非前一天下的单，所以我们吃不到下单第一天的rtn。

## 数据准备

``````import tushare as ts
import numpy as np
import pandas as pd
from multiprocessing import Manager, Pool
import datetime
import os
import pickle
import warnings
warnings.filterwarnings('ignore')

ts.set_token('')
pro = ts.pro_api(timeout=5)

global dataBase
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = curPath[:curPath.find("多因子框架\")+len("多因子框架\")]
dataBase = rootPath+'\data\'

with open(path, 'rb') as handle:

def update_pickle(text, path):
with open(path, 'wb') as handle:
pickle.dump(text, handle)

def __init__(self,start_date='20100101',end_date = None):
self.start_date = start_date
self.end_date = end_date
self.stk_codes = self.get_stks()

def get_trade_dates(self,start_date = None,end_date = None):
if start_date == None:
start_date = self.start_date
end_date = datetime.datetime.now().strftime('%Y%m%d') if end_date == None else self.end_date
df[df['is_open']==1]['cal_date'].drop_duplicates()
return df[df['is_open']==1]['cal_date'].to_list()

def get_stks(self):
stk_set = set()
for list_status in ['L','D','P']:
stk_set |= set(pro.stock_basic(list_status=list_status,fileds='ts_code')['ts_code'].to_list())

return sorted(list(stk_set))

def get_IdxWeight(self,idx_code):
'''
指数成分股
'''
start_date = start_date.strftime('%Y%m%d')
df_ls = []
end_date = pd.to_datetime(start_date) + datetime.timedelta(days=32)
end_date = end_date.strftime('%Y%m%d')
raw_df = pro.index_weight(index_code=idx_code, start_date=start_date,end_date=end_date)
start_date = end_date

res_df = pd.concat(df_ls)
res_df = res_df[~res_df.index.duplicated(keep='first')]
return res_df.sort_index()

def get_ST_valid(self):
'''
ST股
'''
df = pro.namechange(fields='ts_code,name,start_date,end_date')
df = df[df.name.str.contains('ST')]
for i in range(df.shape[0]):
ts_code = df.iloc[i,0]
if ts_code not in self.stk_codes:
continue
s_date = df.iloc[i, 2]
e_date = df.iloc[i, 3]
if e_date == None:
res_df[ts_code].loc[s_date:]=np.nan
else:
res_df[ts_code].loc[s_date:e_date]=np.nan
return res_df.sort_index()

'''
tushare的接口一次最多返回5000条数据，所以按天调取。用并行加速。
'''
try:
except:

def get_suspend_valid(self):
'''
停牌股
'''

m_ls = Manager().list()
pools = Pool(4)
pools.apply_async(self.get_suspend_oneDate,
args=(date,m_ls)
)
pools.close()
pools.join()
m_ls = list(m_ls)
for date,df in m_ls:
print(date,df)
res_df.loc[date,df['ts_code'].to_list()] = np.nan
return res_df.sort_index()

'''
tushare的接口一次最多返回5000条数据，所以按天调取。用并行加速。
'''
try:
except:

def get_limit_valid(self):
'''
停牌股
'''
m_ls = Manager().list()
pools = Pool(3)
pools.apply_async(self.get_limit_oneDate,
args=(date,m_ls)
)
pools.close()
pools.join()
m_ls = list(m_ls)
for date,df in m_ls:
res_df.loc[date,df['ts_code'].to_list()]=np.nan
return res_df.sort_index()

def get_dailyMkt_oneStock(self,ts_code,m_ls):
'''
前复权的行情数据

因为tushare下载复权行情接口一次只能获取一只股票
所以使用多进行并行
'''

try:
#偶尔会因为网络问题请求失败，报错重新请求
m_ls.append(df)
except:
m_ls.append(df)

def get_dailyMkt_mulP(self):
m_ls = Manager().list()
pools = Pool(3)#开太多会有访问频率限制
for ts_code in self.stk_codes:
pools.apply_async(self.get_dailyMkt_oneStock,
args=(ts_code,m_ls))
pools.close()
pools.join()
m_ls = list(m_ls)
raw_df = pd.concat(m_ls)
res_dict = {}
for data_name in ['open','close','high','low','vol','amount']:
res_dict[data_name] = res_df.sort_index()
return res_dict

class DataWriter:
@staticmethod
def commonFunc(data_path,getFunc,cover,*args,**kwds):
if not os.path.exists(data_path) or cover:
t1 = datetime.datetime.now()
print(f'--------{data_path},第一次下载该数据，可能耗时较长')
newData_df.to_pickle(data_path)
t2 = datetime.datetime.now()
print(f'--------下载完成,耗时{t2-t1}')
else:
savedLastDate = savedData_df.index[-1]
print(f'---------{data_path}上次更新至{savedLastDate}，正在更新至最新交易日')

newData_df = pd.concat([savedData_df,lastData_df]).sort_index()
newData_df = newData_df[~newData_df.index.duplicated(keep='first')]
newData_df.to_pickle(data_path)
print(f'---------已更新至最新日期{newData_df.index[-1]}')
newData_df.index = pd.to_datetime(newData_df.index)
return newData_df

@staticmethod
def update_IdxWeight(stk_code,cover=False):
data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'
return DataWriter.commonFunc(data_path,'get_IdxWeight',cover,stk_code)

@staticmethod
def update_ST_valid(cover=False):
data_path = dataBase+f'daily/valid/ST_valid.pkl'
return DataWriter.commonFunc(data_path,'get_ST_valid',cover)

@staticmethod
def update_suspend_valid(cover=False):
data_path = dataBase+'daily/valid/suspend_valid.pkl'
return DataWriter.commonFunc(data_path,'get_suspend_valid',cover)

@staticmethod
def update_limit_valid(cover=False):
data_path = dataBase+'daily/valid/limit_valid.pkl'
return DataWriter.commonFunc(data_path,'get_limit_valid',cover)

@staticmethod
def update_dailyMkt(cover=False):
'''
需要保证已存储的ochlv数据的日期一致
'''
if not os.path.exists(dataBase+f'daily/mkt/open.pkl') or cover:
print(f'--------Mkt,第一次下载该数据，可能耗时较长')
for data_name,df in res_dict.items():
data_path = dataBase+f'daily/mkt//{data_name}.pkl'
df.to_pickle(data_path)
else:
savedLastDate = savedData_df.index[-1]
print(f'---------Mkt,上次更新至{savedLastDate}，正在更新至最新交易日')

new_df = pd.DataFrame()
for data_name,last_df in res_dict.items():
data_path = dataBase+f'daily/mkt//{data_name}.pkl'
new_df = pd.concat([savedData_df,last_df]).sort_index()
new_df = new_df[~new_df.index.duplicated(keep='first')]
new_df.to_pickle(data_path)
print(f'---------已更新至最新日期{new_df.index[-1]}')

@staticmethod
def commonFunc(data_path):
if not os.path.exists(data_path):
print(f'{data_path}不存在，请先调用DataWriter().update_xx')
return
df.index = pd.to_datetime(df.index)
return df

@staticmethod
data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'

@staticmethod
data_path = dataBase+f'daily/valid/ST_valid.pkl'

@staticmethod
data_path = dataBase+'daily/valid/suspend_valid.pkl'

@staticmethod
data_path = dataBase + 'daily/valid/limit_valid.pkl'

@staticmethod
data_path = dataBase+f'daily/mkt/{data_name}.pkl'

@staticmethod
df.index = pd.to_datetime(df.index)
return df['pct_chg']/100

@staticmethod
return df.pct_change()

if __name__ == '__main__':
DataWriter.update_ST_valid(cover=True)
DataWriter.update_suspend_valid(cover=True)
DataWriter.update_IdxWeight('399300.SZ',cover=True)
DataWriter.update_dailyMkt(cover=True)
DataWriter.update_limit_valid(cover=True)
``````

## 单因子检测

1.夏普、年化、最大回撤等指标的实现。
2.to_finnal_position函数，将选出的股票进行股票池、st股、limit等处理，见注释。
3.factor_group分组回测，默认进行十等分，画出净值图和收益率柱状图。
4.ICIR相关，计算IC、IR，画出累计IC图。
5.calc_daily_pnl计算仓位收益。
6.factor_stats综合，最终直接调用该函数。

``````import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def Col_zscore(df, n, cap=None, min_periods=1, check_std=False):
df_mean = df.rolling(window=n,min_periods=min_periods).mean()
df_std = df.rolling(window=n, min_periods=min_periods).std()
if check_std:
df_std = df_std[df_std >= 0.00001]
target = (df - df_mean) / df_std
if cap is not None:
target[target > cap] = cap
target[target < -cap] = -cap
return target

def Row_zscore(df, cap=None, check_std=False):
df_mean = df.mean(axis=1)
df_std = df.std(axis=1)
if check_std:
df_std = df_std[df_std >= 0.00001]
target = df.sub(df_mean, axis=0).div(df_std, axis=0)
if cap is not None:
target[target > cap] = cap
target[target < -cap] = -cap
return target

def MaxDrawdown(asset_series):
return asset_series - np.maximum.accumulate(asset_series)

def Sharpe_yearly(pnl_series):
return (np.sqrt(250) * pnl_series.mean()) / pnl_series.std()

def AnnualReturn(pos_df, pnl_series, alpha_type):
temp_pnl = (1+pnl_series).prod()
if alpha_type == 'ls_alpha':
temp_pos = pos_df.abs().sum().sum() / 2
else:
temp_pos = pos_df.abs().sum().sum()
if temp_pos == 0:
return .0
else:
return round(temp_pnl ** (250 / temp_pos) - 1,2)

def IC(signal, pct_n, min_valids=None, lag=0):
signal = signal.shift(lag)
corr_df = signal.corrwith(pct_n, axis=1,method='spearman').dropna()
if min_valids is not None:
signal_valid = signal.count(axis=1)
signal_valid[signal_valid < min_valids] = np.nan
signal_valid[signal_valid >= min_valids] = 1
corr_signal = corr_df * signal_valid
else:
corr_signal = corr_df
return corr_signal

def IR(signal, pct_n, min_valids=None, lag=0):
corr_signal = IC(signal, pct_n, min_valids, lag)
ic_mean = corr_signal.mean()
ic_std = corr_signal.std()
ir = ic_mean / ic_std
return ir, corr_signal

# def to_weighted_position(selected_df,weights_df = None):
#     if weights_df is None:
#         weights_df = pd.DataFrame().reindex_like(selected_df).fillna(1)
#
#     weights_df = weights_df.reindex_like(selected_df)
#     selected_weights_df = selected_df * weights_df
#     weighted_position_df = selected_weights_df.div(selected_weights_df.sum(axis=1), axis=0)
#     return weighted_position_df

def to_final_position(factor_score, forbid_day):
'''
factor_score:DataFrame,可以是因子值，也可以是根据因子值排序选出来的初始仓位矩阵
forbid_day:DataFrame,是否可交易（由ST股、停盘相乘得到），1代表该股票该日可以交易，不可交易则是NaN

return:
pos_fin：DataFrame,最终仓位
'''

#因为因子df中，index为x的数值是用x日收盘后更新的数据计算的，所以x日不能交易，需要等到下一天交易，所以要shift（1）
pos_fin = factor_score.shift(1).replace(np.nan, 0) * forbid_day
#这里的ffill的效果是，若某只需要交易的股票在调仓日无法交易，那么经过上一个步骤后，其对应位置是nan，而ffill就是让其先继承前一个交易日的仓位
pos_fin = pos_fin.ffill()
return pos_fin

def calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method):
'''
:param factor_df:   因子/仓位矩阵
:param univ_data:   股票池矩阵（如沪深300成分股、中证500成分股等等)
:param idx_rtn:  指数rtn序列
:param forbid_days:  合法交易矩阵
:param rtn_df:    股票rtn矩阵
:param method_func:   feature/factor/ls_alpha/hg_alpha

:return:  仓位矩阵+每日仓位收益率序列
'''
factor_sel = factor_df.copy()
factor_sel = factor_sel.reindex_like(univ_data)*univ_data
forbid_days = forbid_days.reindex_like(factor_sel)
return_df = rtn_df.reindex_like(factor_sel)
if method == 'feature' or method == 'factor':
factor_z = Row_zscore(factor_sel, cap=4.5)
pos_final = to_final_position(factor_z, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
return pos_final,daily_pnl_final
elif method == 'ls_alpha':
pos_final = to_final_position(factor_sel, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
return pos_final,daily_pnl_final
elif method == 'hg_alpha':
pos_final = to_final_position(factor_sel, forbid_days)
daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1) - idx_rtn
return pos_final,daily_pnl_final

def factor_group(factor_df,forb_day,rtn_df,idx_rtn,univ_data,split_pct_ls):
'''
分组回测
'''
factor_df = factor_df.reindex_like(univ_data)*univ_data
factor_score = factor_df
factor_rank_pct = factor_score.rank(ascending=False, pct=True, axis=1)

annual_rtn_ls = list()

plt.figure(figsize=(12, 6))
for split_pct in split_pct_ls:
pos_selected = factor_score[(factor_rank_pct > split_pct[0])&(factor_rank_pct <= split_pct[1])]
pos_selected = pos_selected.where(pd.isnull(pos_selected), 1)
pos = pos_selected.div(pos_selected.sum(axis=1), axis=0)

pos = to_final_position(pos, forb_day).reindex(factor_df.index)
daily_rtn = (pos.shift(1) * rtn_df).sum(axis=1).reindex(factor_df.index)
annual_rtn = AnnualReturn(pos,daily_rtn,'factor')
annual_rtn_ls.append(annual_rtn)
plt.plot((daily_rtn+1).cumprod(), label=str(split_pct))

plt.title('all factor group backtest return',fontsize = 14)
plt.legend()
plt.grid()
plt.show()

xticks = range(len(split_pct_ls))
plt.figure(figsize=(12, 6))
p = plt.subplot(111)
p.bar(x = xticks,height = annual_rtn_ls)
p.set_xticks(xticks)
p.set_xticklabels([x[1]*10 for x in split_pct_ls])
plt.title('factor group annual return',fontsize = 14)
plt.grid()
plt.show()

def factor_stats(
factor_df=None,
chg_n=1,#调仓时间间隔
univ_data=None,
rtn_df=None,
idx_rtn=None,
forbid_days = None,
method='factor',#factorls_alphahg_alpha
group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]
):

if method=='factor':
#         plt.figure(figsize=(16, 12))
plt.figure(figsize=(12, 6))

pos_final,daily_pnl = calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method)
plt.plot(daily_pnl.cumsum())
plt.title('all factor row_Zscore position return',fontsize = 14)
plt.grid(1)
plt.show()

factor_group(
factor_df,
forbid_days,
rtn_df,
idx_rtn,
univ_data,
split_pct_ls=group_split_ls
)

pct_n = rtn_df.rolling(window=chg_n).sum()
ir,IC_series = IR(factor_df, pct_n, lag=chg_n)
plt.figure(figsize=(12, 6))
plt.plot(IC_series.cumsum(),label=f'IR:{round(ir,2)},IC_mean:{round(IC_series.mean(),2)}')
plt.title('IC cumsum',fontsize = 14)
plt.legend()
plt.grid(1)
plt.show()

else:
plt.figure(figsize=(16, 6))
p1 = plt.subplot(111)
pos = factor_df.reindex(factor_df.index[::chg_n])#初步仓位，没有剔除St股票 也没有shift
pos = pos.reindex(factor_df.index).ffill()
pos_final,daily_pnl = calc_daily_pnl(pos, univ_data, rtn_df, idx_rtn,forbid_days,method)
sharpe = round(Sharpe_yearly(daily_pnl),2)
max_drawdown = round(MaxDrawdown((daily_pnl+1).cumprod()),2)
annual_return = round(AnnualReturn(pos_final,daily_pnl,method),2)
p1.plot(daily_pnl.cumsum(),label=f'SP:{sharpe},MD:{max_drawdown.min()},AR:{annual_return}')
p1.set_title('selected position return')
p1.grid(1)
p1.legend()
plt.show()
``````

## 样例

20日收益率因子为例，展示单因子检测内容。

``````from my_lib.data_download.data_io import DataReader
from my_lib.factor_evaluate.factor_evaluate import factor_stats
import pandas as pd
import numpy as np

def calc_factor():
#这里是计算因子的，把因子处理成特定的（di,ii)格式。
return close_df.pct_change(20)
``````

``````factor_df = calc_factor()
factor_df.tail(5)
``````

``````univ_a = DataReader.read_IdxWeight('399300.SZ')#沪深300
univ_a = univ_a.where(pd.isnull(univ_a),1)
univ_a
``````

st股、停牌、涨跌停

``````ST_valid = DataReader.read_ST_valid()
forb_days = ST_valid*suspend_valid*limit_valid
forb_days.tail(5)
``````

``````rtn_df = DataReader.read_dailyRtn()
rtn_df.tail(5)
``````

``````idx_rtn = DataReader.read_index_dailyRtn('399300.SZ')#指数收益序列
factor_stats(
factor_df=factor_df,#因子或者仓位矩阵
chg_n=20,#调仓周期
univ_data=univ_a,#股票池
rtn_df=rtn_df,#股票每日收益矩阵
idx_rtn=idx_rtn,#因子收益序列（用来hg对冲）
forbid_days=forb_days,#st*suspend*limit
method='factor',#factorls_alphahg_alpha（原始因子、指数对冲、多空对冲）
group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]#分组回测参数
)
``````

``````#排序选股，构建仓位
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos = factor_selected.div(factor_selected.sum(axis=1), axis=0)
pos = pos.fillna(0)#重要，否则ffill会出错

factor_stats(
factor_df = pos,
chg_n=20,
univ_data=univ_a,
rtn_df=rtn_df,
idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
forbid_days=forb_days,
method='hg_alpha',#factorls_alphahg_alpha
)
``````

``````factor_df = factor_df.reindex_like(univ_a)*univ_a
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)

#多头仓位
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_long = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)
#空头仓位
factor_selected = factor_df[factor_rank_pct<0.2]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_short = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)

factor_stats(
factor_df = pos_long.fillna(0) - pos_short.fillna(0),
chg_n=20,
univ_data=univ_a,
rtn_df=rtn_df,
idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
forbid_days=forb_days,
method='ls_alpha',#factorls_alphahg_alpha
)
``````

THE END