zoukankan      html  css  js  c++  java
  • 补充停牌的日K数据

    问题

    从TuShare获取的数据,停牌日是没有数据的,这将会在回测时,不能直接参与账户的净值计算,导致账户的净值以及收益计算不准确。

    停盘

    股票由于某种消息或进行某种活动引起股价的连续上涨或下跌,由证券交易所暂停其在股票市场上进行交易。待情况澄清或企业恢复正常后,再复牌在交易所挂牌交易。

    解决方法

    1、 增加is_trading字段,用于区分停牌日还是交易日
    2、补充停牌日的日k数据,更加当前数据现状,填充一个交易日的close、volume、high、low为停牌前最后一个交易日的close、volume为0,is_trading为false

    填充指定时间段的is_trading

    流程图

    代码实现

    1、获取所有交易日列表
    由于指数比如上证指数(000001)是不会停牌的,因此可通过指数来获得交易日期

    def get_trading_date(begin_date= None,end_date=None):
        """
        获取指定日期范围的按照正序排列的交易日列表
        如果没有指定日期范围,则获取从当期日期向前365个自然日内的所有交易日
    
        :param begin_date: 开始日期
        :param end_date: 结束日期
        :return: 交易日期列表
        """
        #当前日期
        now = datetime.now()
        #开始日期,默认当前日期向前365个自然日
        if begin_date is None:
            #当前日期减去365天
            one_year_ago = now - timedelta(days=365)
            #转换成str类型
            begin_date = one_year_ago.strftime("%Y-%m-%d")
        #结束日期默认为今天
        if end_date is None:
            end_date = now.strftime("%Y-%m-%d")
        
        #用上证指数000001作为查询条件,因为指数是不会停盘的,所以可以查询到所有的交易日期
        daily_cursor = DB_CONN.daily.find(
            {"code":"000001",'date':{'$gte':begin_date,'$lte':end_date},'index':True},
            sort=[('date',ASCENDING)],
            projection={'date':True,'_id':False}
        )
        #转换日期列表
        dates = [x['date'] for x in daily_cursor]
        return dates
    

    2、填充某个日行情数据的is_trading,并更新数据库(同样采用bulk_write将更新数据写入数据集)

    def fill_is_trading_between(begin_date=None,end_date=None):
        """
        填充指定时间段内的is_trading字段
        :param begin_date :开始日期
        :param end_date :结束日期
        """
        #获取指定日期范围的左右交易日子列表,按日期正序排列
        all_dates = get_trading_date(begin_date,end_date)
        #循环填充所有交易日的is_trading字段
        for date in all_dates:
            #填充daily数据集
            fill_single_date_is_trading(date,'daily')
            #填充daily_hfq数据集
            fill_single_date_is_trading(date,'daily_hfq')
    
    def fill_single_date_is_trading(date,collection_name):
        """
        填充某一个日行情的数据集的is_trading
        :param date: 日期
        :param collection_name: 集合名称
        """
        print('填充字段,字段名:is_trading,日期:%s,数据集:%s' %(date,collection_name))
        daily_cursor = DB_CONN[collection_name].find(
            {'date':date},
            projection={'code':True,'volume':True,'_id':False},
            batch_size = 1000
            })
    
        update_requests = []
        for daily in daily_cursor:
            #当日成交量大于0,则为交易状态
            is_trading = daily['volume']>0
            update_requests.append(
                UpdateOne(
                    {'code':daily['code'],'date':date},
                    {'$set':{'is_trading':is_trading}}
                )
            )
            if len(update_requests):
                update_result = DB_CONN[collection_name].bulk_write(update_requests,ordered=False)
                print("填充字段,字段名:is_trading,日期:%s,数据集:%s,更新:%4d"%
                (date,collection_name,update_result.modified_count),flush=True)
    

    获取股票基本信息

    用途

    • 获取每日的股票列表

    主要字段

    • 股票代码
    • 股票名称
    • 股本(总股本、流通股本)
    • 上市日期
    • 日期

    通过采用TuShare中get_stock_basics接口获取股票的基本信息
    get_stock_basics()接口信息:

    def get_stock_basics(date=None):
        """
            获取沪深上市公司基本情况
        Parameters
        date:日期YYYY-MM-DD,默认为上一个交易日,目前只能提供2016-08-09之后的历史数据
    
        Return
        --------
        DataFrame
                   code,代码
                   name,名称
                   industry,细分行业
                   area,地区
                   pe,市盈率
                   outstanding,流通股本
                   totals,总股本(万)
                   totalAssets,总资产(万)
                   liquidAssets,流动资产
                   fixedAssets,固定资产
                   reserved,公积金
                   reservedPerShare,每股公积金
                   eps,每股收益
                   bvps,每股净资
                   pb,市净率
                   timeToMarket,上市日期
        """
    

    详细代码实现

    from database import DB_CONN
    from datetime import datetime,timedelta
    import tushare as ts
    from pymongo import UpdateOne
    from stock_util import get_trading_date
    
    """
    从tushare中获取股票的基础数据,保存在本地MongoDB中
    """
    def crawl_basic(begin_date=None,end_date=None):
        """
        抓取指定时间范围的股票基础信息
        :param begin_date:开始时间
        :param end_date:结束时间
        """
        #如果没有指定日期,则默认为前一日
        if begin_date is None :
            begin_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
        if end_date is None:
            end_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    
        #获取指定日期范围的所有交易日列表
        all_dates = get_trading_date(begin_date,end_date)
        #按每个交易日抓取
        for date in all_dates:
            try:
                #抓取当日的基本信息
                crawl_basic_at_date(date)
            except:
                print("抓取股票基本信息出错,日期:%s" %date,flush=True)
    
    def crawl_basic_at_date(date):
        """
        从Tushare抓取指定日期的股票基本信息
        :param date: 日期
        """
        #从tushare获取基本信息,index是股票代码列表
        df_basics = ts.get_stock_basics(date)
        #如果当日没有基本信息,不做操作
        if df_basics is None:
            return
        
        #初始化更新列表
        update_requests=[]
        codes = set(df_basics.index)
        #按照股票代码提取所有数据
        for code in codes:
            #获取一只股票的数据
            doc = df_basics.loc[code]
            try:
                #将上市日期,19971113转换成199-11-13
                time_to_market = datetime.strptime(str(doc['timeToMarket']),"%Y%m%d").strftime('%Y-%m-%d')
                #将总股本和流通股本转为数字
                totals = float(doc['totals'])
                outstanding = float(doc['outstanding'])
    
                #组合基本信息文档
                doc.update(
                    {
                        #股票代码
                        'code':code,
                        #日期
                        'date':date,
                        #上市时间
                        'timeToMarket':time_to_market,
                        #流通股本
                        'outstanding':outstanding,
                        #总股本
                        'totals':totals
                    }
                )
                #生成更新请求,按照code、date创建索引
                update_requests.append(
                    UpdateOne(
                    {'code':code,'date':date},
                    {'$set':doc},
                    upsert=True
                    )
                )
            except:
                print('发生异常,股票代码:%s,日期:%s' %(code,date),flush=True)
                print(doc,flush=True)
            if len(update_requests)>0:
                update_result = DB_CONN['basic'].bulk_write(update_requests,ordered=False)
                print("抓取股票基本信息,日期:%s,插入:%4d条,更新:%4d条" %(date,update_result.upserted_count,update_result.modified_count),flush=True)
    

    执行上述程序后,会将股票基本信息(包括,股票代码,日期,上市时间,流通股本和总股本)保存在MongoDB中名为basic的集合(表)中

    填充停牌日的行情数据

    代码实现:

    ef fill_daily_k_at_suspension_days(begin_date=None,end_date=None):
        """
        填充指定日期范围内,股票停牌日的行情数据
        填充时,停牌的开盘价、最高价、最低价和收盘价都为最近一个交易日的收盘价,成交量为0
        is_trading为False
        """
        #当前日期的前一天
        before = datetime.now() - timedelta(days=1)
        #找到据当前最近一个交易日的所有股票的基本信息
        basics =[]
        while 1:
            #转化成str
            last_trading_date = before.strftime("%Y-%m-%d")
            #因为Tushare的基本信息从2016-08-09开始,如果早于这个时间就结束查找
            if last_trading_date < '2016-08-09':
                break
            #找当日的基本信息
            basic_cursor = DB_CONN['basic'].find(
                {'date':last_trading_date},
                #填充时需要用到两个子段:股票代码code和上市日期timeToMarket,上市日期用来判断是否上市
                projection={'code':True,'timeToMarket':True,'_id':False},
                #一次性返回5000条数据,可以降低网络IO开销,提高速度
                batch_size=5000
            )
            #将数据放到basics列表中
            basics = [basic for basic in basic_cursor]
            #如果查到数据,跳出循环
            if len(basics)>0:
                break
            #没有找到数据,则继续向前一天
            before = before-timedelta(days=1)
        
        #获取指定日期范围内所有交易日列表
        all_dates = get_trading_date(begin_date,end_date)
    
        #填充daily数据集中的停牌日数据
        fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,'daily')
        #填充daily_hfq数据集中的停牌日数据
        fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,'daily_hfq')
    
    def fill_daily_k_at_suspension_days_at_date_at_one_collections(basics,all_dates,collection):
        """
        更新单个数据集的单个日期的数据
        :param basic:基本信息
        :param all_dates:日期列表
        :param collection:集合名
        """
        code_last_trading_daily_dict = dict()
        for date in all_dates:
            update_requests = []
            last_daily_code_set = set(code_last_trading_daily_dict.keys())
            for basic in basics:
                code = basic['code']
                #如果循环日期小于上市日期
                if date < basic['timeToMarket']:
                    print('日期:%s,%s还没上市,上市日期为%s'%(date,code,basic['timeToMarket']),flush=True)
                else:
                    #找到当日数据
                    daily = DB_CONN[collection].find(
                        {'code':code,'date':date}
                    )
                    if daily is not None:
                        code_last_trading_daily_dict[code] = daily
                        last_daily_code_set.add(code)
                    else:
                        if code in last_daily_code_set:
                            last_trading_daily = code_last_trading_daily_dict[code]
                            suspension_daily_doc = {
                                'code':code,
                                'date':date,
                                'close':last_trading_daily['close'],
                                'open':last_trading_daily['close'],
                                'high':last_trading_daily['close'],
                                'low':last_trading_daily['close'],
                                'volume':0,
                                'is_trading':False
                            }
                            update_requests.append(
                                UpdateOne(
                                    {'code':code,'date':date},
                                    {'$set':suspension_daily_doc},
                                    upsert=True
                                )
                            )
            if len(update_requests)>0:
                update_result = DB_CONN[collection].bulk_write(update_requests,ordered=False)
                print('填充停牌数据,日期:%s,数据集:%s,插入:%4d条,更新:%4d条'%(date,collection,update_result.upserted_count,update_result.modified_count),flush=True)
    
  • 相关阅读:
    JVM优化系列之一(-Xss调整Stack Space的大小)
    被kill问题之2:Docker环境下Java应用的JVM设置(容器中的JVM资源该如何被安全的限制)
    docker的坑
    docker容器内存占用 之 系统cache,docker下java的内存该如何配置--temp
    查看docker容器的内存占用
    使用Java监控工具出现 Can't attach to the process
    使用docker查看jvm状态,在docker中使用jmap,jstat
    在Docker中监控Java应用程序的5个方法
    HDU2552 三足鼎立 【数学推理】
    C++面向对象模型
  • 原文地址:https://www.cnblogs.com/whiteBear/p/12744621.html
Copyright © 2011-2022 走看看