量化投资之多因子选股(一):数据准备与单因子检验

前言

本菜狗现在是哈工大威海校区计算机学院的大四本科生,也是量化投资的初学者。因为本科学校中做量化的前辈喝同伴极少,缺少与业界的交流,在很长一段时间,本菜狗一直认为量化就是MACD等技术指标的或者是用一些炫酷的DL模型来预测(因为国内很多量化书籍都是“Python基础语法+技术指标+机器学习”)。
好在没有放弃尝试,终于在经过相当一段时间的焦虑和摸索中,开始对量化行业有了比较符合客观但并不全面的认识。于是开启量化方法论系列博客的创作,把自己的学习总结、分享出来,以和需要的同伴交流。
因为本人能力和精力有限,所创作的内容难免有瑕疵乃至纰漏,欢迎批评指正。
若您对我所做的工作感兴趣,欢迎联系我:[email protected]

系列文章

本文是量化投资方法论之多因子选股系列文章的第一篇,分享数据准备(基于Tushare)和单因子检验模块。

矢量化选股回测概述

提示:若看不太懂,可结合后面的代码理解

要点1:数据格式

把数据处理成特定的DataFrame格式:(di,ii)格式,即columns为股票代码、index为日期,value为数据(如收盘价、成交量、各种因子等),每一个指标是单独的一个df。
但也不是所有数据都要DataFrame,一些只有一维的数据使用Series即可,如特定指数收益率序列。
如下图,是10年到21年A股复权后的close数据。

在这里插入图片描述

要点2:股票池

股票池一般取常用指数的成分股矩阵(或者其组合,如沪深300+中证500),一般命名为univ_auniv_data。univ_a的列名只有现在或历史上曾经是该指数成分股的股票代码。
univ_a[stk_code][trade_date]=1即股票stk_code在trade_date这一天是该指数的成分股。否则univ_a[stk_code][trade_date]=NaN
我们一般在计算因子时计算全部的A股数据,在做选股回测时再考虑股票池。方法为:
factor_df = factor_df.reindex_like(univ_a)*univ_a
其中factor_df为因子数据。reindex_like把factor_df的行、列与univ_a统一。

沪深300的股票池数据示例

在这里插入图片描述

要点3:剔除ST股、停盘股、涨跌停书评

构造矩阵ST_valid,ST_valid[stk_code][trade_date]==1即stk_code在trade_date这一天不是ST股,通过了ST股筛选,否则是NaN
构造矩阵suspend_valid、limit_valid,同ST。
forbid_days = suspend_valid*limit_valid 只有股票在当天同时通过三种筛选,其数值才为1

在做剔除操作时,只需要将仓位矩阵*forbid_days,没有通过筛选的股票的数据就成了NaN

要点4: 仓位构建

step1:横截面排序,按照比例筛选股票,再进行仓位权重归一化,得到初始仓位pos_1
step2:若考虑调仓周期,假设是月频率调仓,就是隔20个交易日调仓。
pos_2 = pos_1.reindex(pos_1.index[::20]).fillna(0).reindex(pos_1.index).ffill()
即截取调仓日的仓位,扩充其他日期的仓位先设置为nan,再ffill().
这里有一个fillna(0),因为如果不在扩展之前fillna,则可能会出现 某一期一只股票被选上,后面这只股票的仓位原本都应该是NaN,但经过ffill之后却都继承了这个仓位、
step3:考虑调仓日不可交易股票(见to_final_position)

这里有一点要强调,即我们的是factor_df转换成仓位pos_df时,需要shift(1).因为factor是当天收盘后;利用了当天和之前的数据计算的,而交易最早发生在后一个交易日。

要点5:回测

处理好的仓位fin_pos.shift(1)*rtn_df再横向求和就是仓位的日收益,其中rtn_df是股票收益矩阵.
shift()的原因是:rtn_df是当天收盘价(开盘价)比上前一天的收盘价(开盘价),而我们是当天非前一天下的单,所以我们吃不到下单第一天的rtn。

数据准备

本人尝试基于tushare(需要积分)获取研究所需要的常用数据(如股票行情数据、指数成分股数据、st股、涨跌停等)并处理成前文所示的(di,ii)格式。

与数据下载、存储、读取相关的代码如下。
但tushare的接口对调取频率、单次调取数据量有限制。因精力有限,并没有对代码进行完全的优化,故部分代码比较啰嗦,运行效率低。
另,因为部分方法使用了多进程并行下载,故无法在jupyter notebook等交互式环境下运行,请在pycharm等IDE下执行main()函数。

数据准备部分主要有三个类。
数据下载的DataDownloader,从tushare接口获取数据并整理成特定格式。
数据更新存储的DataWriter,调用DataDownloader将数据下载、更新、存储到本地。
数据读取的DataReader,读取数据。

import tushare as ts
import numpy as np
import pandas as pd
from multiprocessing import Manager, Pool
import datetime
import os
import pickle
import warnings
warnings.filterwarnings('ignore')


ts.set_token('')
pro = ts.pro_api(timeout=5)

global dataBase
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = curPath[:curPath.find("多因子框架\")+len("多因子框架\")]
dataBase = rootPath+'\data\'


def read_pickle(path):
    with open(path, 'rb') as handle:
        return pickle.load(handle)

def update_pickle(text, path):
    with open(path, 'wb') as handle:
        pickle.dump(text, handle)

class DataDownloader:
    def __init__(self,start_date='20100101',end_date = None):
        self.start_date = start_date
        self.end_date = end_date
        self.trade_dates = self.get_trade_dates()
        self.stk_codes = self.get_stks()
        #self.template_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes)

    def get_trade_dates(self,start_date = None,end_date = None):
        if start_date == None:
            start_date = self.start_date
        end_date = datetime.datetime.now().strftime('%Y%m%d') if end_date == None else self.end_date
        df = pro.trade_cal(exchange='SSE', start_date=start_date,end_date=end_date)
        df[df['is_open']==1]['cal_date'].drop_duplicates()
        return df[df['is_open']==1]['cal_date'].to_list()

    def get_stks(self):
        stk_set = set()
        for list_status in ['L','D','P']:
            stk_set |= set(pro.stock_basic(list_status=list_status,fileds='ts_code')['ts_code'].to_list())

        return sorted(list(stk_set))

    def get_IdxWeight(self,idx_code):
        '''
        指数成分股
        '''
        start_date = pd.to_datetime(self.trade_dates[0]) - datetime.timedelta(days=32)
        start_date = start_date.strftime('%Y%m%d')
        trade_dates = self.get_trade_dates(start_date)
        df_ls = []
        while start_date < trade_dates[-1]:
            end_date = pd.to_datetime(start_date) + datetime.timedelta(days=32)
            end_date = end_date.strftime('%Y%m%d')
            raw_df = pro.index_weight(index_code=idx_code, start_date=start_date,end_date=end_date)
            df_ls.append(raw_df.pivot(index = 'trade_date',columns = 'con_code',values='weight'))
            start_date = end_date

        res_df = pd.concat(df_ls)
        res_df = res_df[~res_df.index.duplicated(keep='first')]
        res_df = res_df.reindex(trade_dates)
        res_df = res_df.ffill().reindex(self.trade_dates)
        return res_df.sort_index()

    def get_ST_valid(self):
        '''
        ST股
        '''
        res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)
        df = pro.namechange(fields='ts_code,name,start_date,end_date')
        df = df[df.name.str.contains('ST')]
        for i in range(df.shape[0]):
            ts_code = df.iloc[i,0]
            if ts_code not in self.stk_codes:
                continue
            s_date = df.iloc[i, 2]
            e_date = df.iloc[i, 3]
            if e_date == None:
                res_df[ts_code].loc[s_date:]=np.nan
            else:
                res_df[ts_code].loc[s_date:e_date]=np.nan
        return res_df.sort_index()

    def get_suspend_oneDate(self,trade_date,m_ls):
        '''
        tushare的接口一次最多返回5000条数据,所以按天调取。用并行加速。
        '''
        try:
            df = pro.suspend_d(suspend_type='S',trade_date=trade_date)
            m_ls.append([trade_date,df])
        except:
            df = pro.suspend_d(suspend_type='S',trade_date=trade_date)
            m_ls.append([trade_date,df])

    def get_suspend_valid(self):
        '''
        停牌股
        '''
        res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)

        m_ls = Manager().list()
        pools = Pool(4)
        for date in self.trade_dates:
            pools.apply_async(self.get_suspend_oneDate,
                              args=(date,m_ls)
                             )
        pools.close()
        pools.join()
        m_ls = list(m_ls)
        for date,df in m_ls:
            print(date,df)
            res_df.loc[date,df['ts_code'].to_list()] = np.nan
        return res_df.sort_index()


    def get_limit_oneDate(self,trade_date,m_ls):
        '''
        tushare的接口一次最多返回5000条数据,所以按天调取。用并行加速。
        '''
        try:
            df = pro.limit_list(trade_date=trade_date)
            m_ls.append([trade_date,df])
        except:
            df = pro.suspend_d(trade_date=trade_date)
            m_ls.append([trade_date,df])


    def get_limit_valid(self):
        '''
        停牌股
        '''
        res_df = pd.DataFrame(index=self.trade_dates,columns=self.stk_codes).fillna(1)
        m_ls = Manager().list()
        pools = Pool(3)
        for date in self.trade_dates:
            pools.apply_async(self.get_limit_oneDate,
                              args=(date,m_ls)
                              )
        pools.close()
        pools.join()
        m_ls = list(m_ls)
        for date,df in m_ls:
            res_df.loc[date,df['ts_code'].to_list()]=np.nan
        return res_df.sort_index()
    
    def get_dailyMkt_oneStock(self,ts_code,m_ls):
        '''
        前复权的行情数据

        因为tushare下载复权行情接口一次只能获取一只股票
        所以使用多进行并行
        '''

        try:
            #偶尔会因为网络问题请求失败,报错重新请求
            df = ts.pro_bar(ts_code=ts_code, adj='qfq', start_date=self.start_date,end_date=self.end_date)
            m_ls.append(df)
        except:
            df = ts.pro_bar(ts_code=ts_code, adj='qfq', start_date=self.start_date,end_date=self.end_date)
            m_ls.append(df)

    def get_dailyMkt_mulP(self):
        m_ls = Manager().list()
        pools = Pool(3)#开太多会有访问频率限制
        for ts_code in self.stk_codes:
            pools.apply_async(self.get_dailyMkt_oneStock,
                              args=(ts_code,m_ls))
        pools.close()
        pools.join()
        m_ls = list(m_ls)
        raw_df = pd.concat(m_ls)
        res_dict = {}
        for data_name in ['open','close','high','low','vol','amount']:
            res_df = raw_df.pivot(index='trade_date',columns='ts_code',values=data_name)
            res_dict[data_name] = res_df.sort_index()
        return res_dict

class DataWriter:
    @staticmethod
    def commonFunc(data_path,getFunc,cover,*args,**kwds):
        if not os.path.exists(data_path) or cover:
            t1 = datetime.datetime.now()
            print(f'--------{data_path},第一次下载该数据,可能耗时较长')
            newData_df = eval(f'DataDownloader().{getFunc}(*args,**kwds)')
            newData_df.to_pickle(data_path)
            t2 = datetime.datetime.now()
            print(f'--------下载完成,耗时{t2-t1}')
        else:
            savedData_df = pd.read_pickle(data_path)
            savedLastDate = savedData_df.index[-1]
            print(f'---------{data_path}上次更新至{savedLastDate},正在更新至最新交易日')

            lastData_df = eval(f'DataDownloader(savedLastDate).{getFunc}(*args,**kwds)')
            newData_df = pd.concat([savedData_df,lastData_df]).sort_index()
            newData_df = newData_df[~newData_df.index.duplicated(keep='first')]
            newData_df.to_pickle(data_path)
            print(f'---------已更新至最新日期{newData_df.index[-1]}')
        newData_df.index = pd.to_datetime(newData_df.index)
        return newData_df

    @staticmethod
    def update_IdxWeight(stk_code,cover=False):
        data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'
        return DataWriter.commonFunc(data_path,'get_IdxWeight',cover,stk_code)

    @staticmethod
    def update_ST_valid(cover=False):
        data_path = dataBase+f'daily/valid/ST_valid.pkl'
        return DataWriter.commonFunc(data_path,'get_ST_valid',cover)

    @staticmethod
    def update_suspend_valid(cover=False):
        data_path = dataBase+'daily/valid/suspend_valid.pkl'
        return DataWriter.commonFunc(data_path,'get_suspend_valid',cover)

    @staticmethod
    def update_limit_valid(cover=False):
        data_path = dataBase+'daily/valid/limit_valid.pkl'
        return DataWriter.commonFunc(data_path,'get_limit_valid',cover)

    @staticmethod
    def update_dailyMkt(cover=False):
        '''
            需要保证已存储的ochlv数据的日期一致
        '''
        if not os.path.exists(dataBase+f'daily/mkt/open.pkl') or cover:
            print(f'--------Mkt,第一次下载该数据,可能耗时较长')
            res_dict = DataDownloader().get_dailyMkt_mulP()
            for data_name,df in res_dict.items():
                data_path = dataBase+f'daily/mkt//{data_name}.pkl'
                df.to_pickle(data_path)
        else:
            savedData_df = pd.read_pickle(dataBase+f'daily/mkt/open.pkl')
            savedLastDate = savedData_df.index[-1]
            print(f'---------Mkt,上次更新至{savedLastDate},正在更新至最新交易日')

            res_dict = DataDownloader(savedLastDate).get_dailyMkt_mulP()
            new_df = pd.DataFrame()
            for data_name,last_df in res_dict.items():
                data_path = dataBase+f'daily/mkt//{data_name}.pkl'
                new_df = pd.concat([savedData_df,last_df]).sort_index()
                new_df = new_df[~new_df.index.duplicated(keep='first')]
                new_df.to_pickle(data_path)
            print(f'---------已更新至最新日期{new_df.index[-1]}')

class DataReader:
    @staticmethod
    def commonFunc(data_path):
        if not os.path.exists(data_path):
            print(f'{data_path}不存在,请先调用DataWriter().update_xx')
            return
        df = pd.read_pickle(data_path)
        df.index = pd.to_datetime(df.index)
        return df

    @staticmethod
    def read_IdxWeight(stk_code):
        data_path = dataBase+f'daily/idx_cons/{stk_code}.pkl'
        return DataReader.commonFunc(data_path)

    @staticmethod
    def read_ST_valid():
        data_path = dataBase+f'daily/valid/ST_valid.pkl'
        return DataReader.commonFunc(data_path)

    @staticmethod
    def read_suspend_valid():
        data_path = dataBase+'daily/valid/suspend_valid.pkl'
        return DataReader.commonFunc(data_path)

    @staticmethod
    def read_limit_valid():
        data_path = dataBase + 'daily/valid/limit_valid.pkl'
        return DataReader.commonFunc(data_path)

    @staticmethod
    def read_dailyMkt(data_name):
        data_path = dataBase+f'daily/mkt/{data_name}.pkl'
        return DataReader.commonFunc(data_path)

    @staticmethod
    def read_index_dailyRtn(index_code,start_date = '20100101'):
        df = pro.index_daily(ts_code=index_code, start_date= start_date).set_index('trade_date').sort_index()
        df.index = pd.to_datetime(df.index)
        return df['pct_chg']/100

    @staticmethod
    def read_dailyRtn():
        df = DataReader.read_dailyMkt('close')
        return df.pct_change()


if __name__ == '__main__':
    DataWriter.update_ST_valid(cover=True)
    DataWriter.update_suspend_valid(cover=True)
    DataWriter.update_IdxWeight('399300.SZ',cover=True)
    DataWriter.update_dailyMkt(cover=True)
    DataWriter.update_limit_valid(cover=True)

单因子检测

主要有
1.夏普、年化、最大回撤等指标的实现。
2.to_finnal_position函数,将选出的股票进行股票池、st股、limit等处理,见注释。
3.factor_group分组回测,默认进行十等分,画出净值图和收益率柱状图。
4.ICIR相关,计算IC、IR,画出累计IC图。
5.calc_daily_pnl计算仓位收益。
6.factor_stats综合,最终直接调用该函数。

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

def Col_zscore(df, n, cap=None, min_periods=1, check_std=False):
    df_mean = df.rolling(window=n,min_periods=min_periods).mean()
    df_std = df.rolling(window=n, min_periods=min_periods).std()
    if check_std:
        df_std = df_std[df_std >= 0.00001]
    target = (df - df_mean) / df_std
    if cap is not None:
        target[target > cap] = cap
        target[target < -cap] = -cap
    return target

def Row_zscore(df, cap=None, check_std=False):
    df_mean = df.mean(axis=1)
    df_std = df.std(axis=1)
    if check_std:
        df_std = df_std[df_std >= 0.00001]
    target = df.sub(df_mean, axis=0).div(df_std, axis=0)
    if cap is not None:
        target[target > cap] = cap
        target[target < -cap] = -cap
    return target

def MaxDrawdown(asset_series):
    return asset_series - np.maximum.accumulate(asset_series)

def Sharpe_yearly(pnl_series):
    return (np.sqrt(250) * pnl_series.mean()) / pnl_series.std()

def AnnualReturn(pos_df, pnl_series, alpha_type):
    temp_pnl = (1+pnl_series).prod()
    if alpha_type == 'ls_alpha':
        temp_pos = pos_df.abs().sum().sum() / 2
    else:
        temp_pos = pos_df.abs().sum().sum()
    if temp_pos == 0:
        return .0
    else:
        return round(temp_pnl ** (250 / temp_pos) - 1,2)

def IC(signal, pct_n, min_valids=None, lag=0):
    signal = signal.shift(lag)
    corr_df = signal.corrwith(pct_n, axis=1,method='spearman').dropna()
    if min_valids is not None:
        signal_valid = signal.count(axis=1)
        signal_valid[signal_valid < min_valids] = np.nan
        signal_valid[signal_valid >= min_valids] = 1
        corr_signal = corr_df * signal_valid
    else:
        corr_signal = corr_df
    return corr_signal

def IR(signal, pct_n, min_valids=None, lag=0):
    corr_signal = IC(signal, pct_n, min_valids, lag)
    ic_mean = corr_signal.mean()
    ic_std = corr_signal.std()
    ir = ic_mean / ic_std
    return ir, corr_signal

# def to_weighted_position(selected_df,weights_df = None):
#     if weights_df is None:
#         weights_df = pd.DataFrame().reindex_like(selected_df).fillna(1)
#
#     weights_df = weights_df.reindex_like(selected_df)
#     selected_weights_df = selected_df * weights_df
#     weighted_position_df = selected_weights_df.div(selected_weights_df.sum(axis=1), axis=0)
#     return weighted_position_df

def to_final_position(factor_score, forbid_day):
    '''
    factor_score:DataFrame,可以是因子值,也可以是根据因子值排序选出来的初始仓位矩阵
    forbid_day:DataFrame,是否可交易(由ST股、停盘相乘得到),1代表该股票该日可以交易,不可交易则是NaN

    return:
        pos_fin:DataFrame,最终仓位
    '''

    #因为因子df中,index为x的数值是用x日收盘后更新的数据计算的,所以x日不能交易,需要等到下一天交易,所以要shift(1)
    pos_fin = factor_score.shift(1).replace(np.nan, 0) * forbid_day
    #这里的ffill的效果是,若某只需要交易的股票在调仓日无法交易,那么经过上一个步骤后,其对应位置是nan,而ffill就是让其先继承前一个交易日的仓位
    pos_fin = pos_fin.ffill()
    return pos_fin

def calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method):
    '''
    :param factor_df:   因子/仓位矩阵
    :param univ_data:   股票池矩阵(如沪深300成分股、中证500成分股等等)
    :param idx_rtn:  指数rtn序列
    :param forbid_days:  合法交易矩阵
    :param rtn_df:    股票rtn矩阵
    :param method_func:   feature/factor/ls_alpha/hg_alpha

    :return:  仓位矩阵+每日仓位收益率序列
    '''
    factor_sel = factor_df.copy()
    factor_sel = factor_sel.reindex_like(univ_data)*univ_data
    forbid_days = forbid_days.reindex_like(factor_sel)
    return_df = rtn_df.reindex_like(factor_sel)
    if method == 'feature' or method == 'factor':
        factor_z = Row_zscore(factor_sel, cap=4.5)
        pos_final = to_final_position(factor_z, forbid_days)
        daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
        return pos_final,daily_pnl_final
    elif method == 'ls_alpha':
        pos_final = to_final_position(factor_sel, forbid_days)
        daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1)
        return pos_final,daily_pnl_final
    elif method == 'hg_alpha':
        pos_final = to_final_position(factor_sel, forbid_days)
        daily_pnl_final = (pos_final.shift(1) * return_df).sum(axis=1) - idx_rtn
        return pos_final,daily_pnl_final

def factor_group(factor_df,forb_day,rtn_df,idx_rtn,univ_data,split_pct_ls):
    '''
    分组回测
    '''
    factor_df = factor_df.reindex_like(univ_data)*univ_data
    factor_score = factor_df
    factor_rank_pct = factor_score.rank(ascending=False, pct=True, axis=1)

    annual_rtn_ls = list()

    plt.figure(figsize=(12, 6))
    for split_pct in split_pct_ls:
        pos_selected = factor_score[(factor_rank_pct > split_pct[0])&(factor_rank_pct <= split_pct[1])]
        pos_selected = pos_selected.where(pd.isnull(pos_selected), 1)
        pos = pos_selected.div(pos_selected.sum(axis=1), axis=0)

        pos = to_final_position(pos, forb_day).reindex(factor_df.index)
        daily_rtn = (pos.shift(1) * rtn_df).sum(axis=1).reindex(factor_df.index)
        annual_rtn = AnnualReturn(pos,daily_rtn,'factor')
        annual_rtn_ls.append(annual_rtn)
        plt.plot((daily_rtn+1).cumprod(), label=str(split_pct))

    plt.title('all factor group backtest return',fontsize = 14)
    plt.legend()
    plt.grid()
    plt.show()

    xticks = range(len(split_pct_ls))
    plt.figure(figsize=(12, 6))
    p = plt.subplot(111)
    p.bar(x = xticks,height = annual_rtn_ls)
    p.set_xticks(xticks)
    p.set_xticklabels([x[1]*10 for x in split_pct_ls])
    plt.title('factor group annual return',fontsize = 14)
    plt.grid()
    plt.show()



def factor_stats(
        factor_df=None,
        chg_n=1,#调仓时间间隔
        univ_data=None,
        rtn_df=None,
        idx_rtn=None,
        forbid_days = None,
        method='factor',#factorls_alphahg_alpha
        group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]
):


    if method=='factor':
        #         plt.figure(figsize=(16, 12))
        plt.figure(figsize=(12, 6))


        pos_final,daily_pnl = calc_daily_pnl(factor_df, univ_data, rtn_df, idx_rtn,forbid_days,method)
        plt.plot(daily_pnl.cumsum())
        plt.title('all factor row_Zscore position return',fontsize = 14)
        plt.grid(1)
        plt.show()

        factor_group(
            factor_df,
            forbid_days,
            rtn_df,
            idx_rtn,
            univ_data,
            split_pct_ls=group_split_ls
        )

        pct_n = rtn_df.rolling(window=chg_n).sum()
        ir,IC_series = IR(factor_df, pct_n, lag=chg_n)
        plt.figure(figsize=(12, 6))
        plt.plot(IC_series.cumsum(),label=f'IR:{round(ir,2)},IC_mean:{round(IC_series.mean(),2)}')
        plt.title('IC cumsum',fontsize = 14)
        plt.legend()
        plt.grid(1)
        plt.show()


    else:
        plt.figure(figsize=(16, 6))
        p1 = plt.subplot(111)
        pos = factor_df.reindex(factor_df.index[::chg_n])#初步仓位,没有剔除St股票 也没有shift
        pos = pos.reindex(factor_df.index).ffill()
        pos_final,daily_pnl = calc_daily_pnl(pos, univ_data, rtn_df, idx_rtn,forbid_days,method)
        sharpe = round(Sharpe_yearly(daily_pnl),2)
        max_drawdown = round(MaxDrawdown((daily_pnl+1).cumprod()),2)
        annual_return = round(AnnualReturn(pos_final,daily_pnl,method),2)
        p1.plot(daily_pnl.cumsum(),label=f'SP:{sharpe},MD:{max_drawdown.min()},AR:{annual_return}')
        p1.set_title('selected position return')
        p1.grid(1)
        p1.legend()
    plt.show()

样例

20日收益率因子为例,展示单因子检测内容。
本人本地的目录如下图所示。
在这里插入图片描述

from my_lib.data_download.data_io import DataReader
from my_lib.factor_evaluate.factor_evaluate import factor_stats
import pandas as pd
import numpy as np

def calc_factor():
	#这里是计算因子的,把因子处理成特定的(di,ii)格式。
    close_df = DataReader.read_dailyMkt('close')
    return close_df.pct_change(20)

计算因子

factor_df = calc_factor()
factor_df.tail(5)

在这里插入图片描述
股票池

univ_a = DataReader.read_IdxWeight('399300.SZ')#沪深300
univ_a = univ_a.where(pd.isnull(univ_a),1)
univ_a

在这里插入图片描述

st股、停牌、涨跌停

ST_valid = DataReader.read_ST_valid()
suspend_valid = DataReader.read_suspend_valid()
limit_valid = DataReader.read_limit_valid()
forb_days = ST_valid*suspend_valid*limit_valid
forb_days.tail(5)

在这里插入图片描述

每日收益率矩阵

rtn_df = DataReader.read_dailyRtn()
rtn_df.tail(5)

因子测试与回测
原始因子

idx_rtn = DataReader.read_index_dailyRtn('399300.SZ')#指数收益序列
factor_stats(
    factor_df=factor_df,#因子或者仓位矩阵
    chg_n=20,#调仓周期
    univ_data=univ_a,#股票池
    rtn_df=rtn_df,#股票每日收益矩阵
    idx_rtn=idx_rtn,#因子收益序列(用来hg对冲)
    forbid_days=forb_days,#st*suspend*limit
    method='factor',#factorls_alphahg_alpha(原始因子、指数对冲、多空对冲)
    group_split_ls=[(0,0.1),(0.1,0.2),(0.2,0.3),(0.3,0.4),(0.4,0.5),(0.5,0.6),(0.6,0.7),(0.7,0.8),(0.8,0.9),(0.9,1.0)]#分组回测参数
)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

指数对冲

#排序选股,构建仓位
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos = factor_selected.div(factor_selected.sum(axis=1), axis=0)
pos = pos.fillna(0)#重要,否则ffill会出错


factor_stats(
    factor_df = pos,
    chg_n=20,
    univ_data=univ_a,
    rtn_df=rtn_df,
    idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
    forbid_days=forb_days,
    method='hg_alpha',#factorls_alphahg_alpha
)

在这里插入图片描述

多空对冲

factor_df = factor_df.reindex_like(univ_a)*univ_a
factor_rank_pct = factor_df.rank(ascending=False, pct=True, axis=1)

#多头仓位
factor_selected = factor_df[factor_rank_pct>0.8]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_long = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)
#空头仓位
factor_selected = factor_df[factor_rank_pct<0.2]
factor_selected = factor_selected.where(pd.isnull(factor_selected), 1)
pos_short = factor_selected.div(factor_selected.sum(axis=1), axis=0).fillna(0)

factor_stats(
    factor_df = pos_long.fillna(0) - pos_short.fillna(0),
    chg_n=20,
    univ_data=univ_a,
    rtn_df=rtn_df,
    idx_rtn=idx_rtn.replace(np.inf,np.nan).replace(-np.inf,np.nan),
    forbid_days=forb_days,
    method='ls_alpha',#factorls_alphahg_alpha
)

在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>