zoukankan      html  css  js  c++  java
  • tushare获取股票每日重要的基本面指标数据,并存入Elasticsearch

    tushare是一个开放的,免费的金融数据平台,包含沪深股票数据,指数数据,基金数据,期货数据,期权数据,债券数据,外汇数据,港股数据,行业经济数据,宏观经济数据以及新闻快讯等特色数据。其中以沪深股票数据最为丰富,包含了有:


    基本包含了沪深股票全部常用数据。


    tushare 目前提供了四种获取数据的方式,分别为 http, Python SDK, Matlab SDK, R SDK。

    这里介绍如何用Python SDK获取股票的每日指标数据。


    (1)注册tushare用户,获取 token

        注册网页链接为 https://tushare.pro/register?reg=369571

        注册完成后可以在个人主页的接口TOKEN下看到自己的token

    image


    (2)安装 tushare

    个人使用的python开发的IDE为 pycharm

    pip install tushare -i https://pypi.tuna.tsinghua.edu.cn/simple

    tushare依赖了numpy,pandas等一些库,安装完之后可能需要根据报错提示安装对应的库


    (4)安装Elasticsearch

    pip3 install elasticsearch -i https://pypi.tuna.tsinghua.edu.cn/simple

     

    (5)调用tushare

    这里把调用tushare的函数都封装在了一个文件里面,代码如下

    import datetime
    import time
    import numpy as np
    import tushare as ts
    
    ts.set_token('b15148f5ca285bd0e85bbc3f659daefff549ade3bba06fae6a037f03')
    pro = ts.pro_api()
    
    
    # 股票列表
    def get_all_stock():
        stocks = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,fullname,area,industry,list_date')
        return stocks
        
    
    # 每日指标
    def get_daily_basic(share_code, start_date, end_date):
        while 1:
            try:
                df = pro.daily_basic(ts_code=share_code, start_date=start_date, end_date=end_date, timeout=60)
                return df
            except:
                print("get_daily_basic 获取失败,参数为:", share_code, start_date, end_date)
                time.sleep(0.5)

    stock_basic接口用于获取股票列表,本接口文档网址:https://tushare.pro/document/2?doc_id=25

    daily_basic接口用于获取每日指标,网址:https://tushare.pro/document/2?doc_id=32。 这里用一个循环来获取,因为tushare对每分钟调用次数有限制(这也是为啥我要把数据保存到本地),超过次数限制时会报错,所以我这里用一个except获取异常,等待0.5s后重新再试。


    (6)保存到elasticsearch

    保存到elasticsearch之前当然需要本机已经启动了elasticsearch。

    关于elasticsearch的安装配置见我的另一篇博客https://www.cnblogs.com/betterwgo/p/11240821.html

    python 调用 tushare,并将数据保存到elasticsearch的代码如下:

    # 每日指标
    import configparser
    import logging
    
    import numpy as np
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    
    import stock_parser as parser
    
    logger = logging.getLogger(__name__)
    logger.setLevel(level=logging.INFO)
    handler = logging.FileHandler("log_daily_basic.txt")
    handler.setLevel(logging.INFO)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.info("Start print log")
    
    config = configparser.ConfigParser()
    config.read("config.ini")
    latest_daily_basic_tscode = config.get("daily", "latest_daily_basic_tscode")
    
    es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
    
    # ts_code                str    TS股票代码
    # trade_date            str    交易日期
    # close                    float    当日收盘价
    # turnover_rate            float    换手率(%)
    # turnover_rate_f        float    换手率(自由流通股)
    # volume_ratio            float    量比
    # pe                    float    市盈率(总市值/净利润)
    # pe_ttm                float    市盈率(TTM)
    # pb                    float    市净率(总市值/净资产)
    # ps                    float    市销率
    # ps_ttm                float    市销率(TTM)
    # total_share            float    总股本 (万股)
    # float_share            float    流通股本 (万股)
    # free_share            float    自由流通股本 (万)
    # total_mv                float    总市值 (万元)
    # circ_mv                float    流通市值(万元)
    body = {
        "mappings": {
            "properties": {
                "ts_code": {
                    "type": "keyword"
                },
                "trade_date": {
                    "type": "integer"
                },
                "close": {
                    "type": "float"
                },
                "turnover_rate": {
                    "type": "float"
                },
                "turnover_rate_f": {
                    "type": "float"
                },
                "volume_ratio": {
                    "type": "float"
                },
                "pe": {
                    "type": "float"
                },
                "pe_ttm": {
                    "type": "float"
                },
                "pb": {
                    "type": "float"
                },
                "ps": {
                    "type": "float"
                },
                "ps_ttm": {
                    "type": "float"
                },
                "total_share": {
                    "type": "float"
                },
                "float_share": {
                    "type": "float"
                },
                "free_share": {
                    "type": "float"
                },
                "total_mv": {
                    "type": "float"
                },
                "circ_mv": {
                    "type": "float"
                }
            }
        }
    }
    index = 'index_daily_basic'
    es.indices.create(index=index, body=body, ignore=400)
    
    
    def check_float(item, x_name):
        x = item[x_name]
        if x is None or np.isnan(x):
            x = 0.0
            logger.info("%s %s %s is None or nan" % (item['ts_code'], item['trade_date'], x_name))
        return x
    
    
    def es_insert_daily_basic(df):
        actions = []
        for i in range(len(df)):
            df_item = df.iloc[i]
            tscode = df_item['ts_code']
            trade_date = int(df_item['trade_date'])
            x = tscode.split('.', 1)
            col_name = x[1] + x[0]
            _id = col_name + df_item['trade_date']
    
            close = check_float(df_item, 'close')
            turnover_rate = check_float(df_item, 'turnover_rate')
            turnover_rate_f = check_float(df_item, 'turnover_rate_f')
            volume_ratio = check_float(df_item, 'volume_ratio')
            pe = check_float(df_item, 'pe')
            pe_ttm = check_float(df_item, 'pe_ttm')
            pb = check_float(df_item, 'pb')
            ps = check_float(df_item, 'ps')
            ps_ttm = check_float(df_item, 'ps_ttm')
            total_share = check_float(df_item, 'total_share')
            float_share = check_float(df_item, 'float_share')
            free_share = check_float(df_item, 'free_share')
            total_mv = check_float(df_item, 'total_mv')
            circ_mv = check_float(df_item, 'circ_mv')
            action = {
                "_index": index,
                "_type": "_doc",
                "_id": _id,
                "_source": {
                    "ts_code": ts_code,
                    "trade_date": trade_date,
                    "close": close,
                    "turnover_rate": turnover_rate,
                    "turnover_rate_f": turnover_rate_f,
                    "volume_ratio": volume_ratio,
                    "pe": pe,
                    "pe_ttm": pe_ttm,
                    "pb": pb,
                    "ps": ps,
                    "ps_ttm": ps_ttm,
                    "total_share": total_share,
                    "float_share": float_share,
                    "free_share": free_share,
                    "total_mv": total_mv,
                    "circ_mv": circ_mv
                }
            }
            # 形成一个长度与查询结果数量相等的列表
            actions.append(action)
            if i % 1000 == 0 or i == (len(df) - 1):
                helpers.bulk(client=es, actions=actions)
                actions.clear()
        actions.clear()
    
    
    def update_latest_daily_basic_tscode(tscode):
        config.set("daily", "latest_daily_basic_tscode", tscode)
        # write to file
        with open("config.ini", "w+") as f:
            config.write(f)
    
    
    # 更新单只股票
    def update_daily_basic(tscode, start_date, end_date):
        df = parser.get_daily_basic(tscode, start_date, end_date)
        es_insert_daily_basic(df)
        return len(df)
    
    
    if __name__ == "__main__":
        # 获取全部上市股票代码
        stocks = parser.get_all_stock()
        bIn = True
        for i in range(len(stocks)):
            stock = stocks.iloc[i]
            ts_code = stock['ts_code']
            if latest_daily_basic_tscode == ts_code:
                bIn = False
            if not bIn:
                count = update_daily_basic(ts_code, '20000101', '')
                print(i, ts_code, count)
                update_latest_daily_basic_tscode(ts_code)
            else:
                print(i, ts_code)

    这里日志用的logging,没具体研究一股脑全搬上来了,反正我只需要打印个错误日志就行。

    然后还用了一个  configparser 来解析 ini 配置文件,config.ini文件中配置如下信息:

    [daily]
    latest_daily_basic_tscode = 000001.SZ

    配置文件的目的是再程序中断后重新启动不用从第一个开始,直接从配置文件中的开始。获取股票列表的接口的第一条是 000001.SZ,所以这里初始配置为它,这里其实可以优化一下。

    数据保存到elasticsearch用的是 helps中的bulk函数,做批量索引

    看一下保存的结果情况:

    image






    tushare注册: https://tushare.pro/register?reg=369571





  • 相关阅读:
    表的数据类型
    SQLYog Enterprise注册码分享
    MySQL下载安装、基本配置、问题处理
    windows下命令行模式中cd命令无效的原因
    TCP协议三次握手与四次挥手通俗解析
    使用concurrent.futures模块并发,实现进程池、线程池
    进程池、线程池、回调函数
    浅谈web网站架构演变过程
    memcached单点故障与负载均衡
    memcached性能监控
  • 原文地址:https://www.cnblogs.com/betterwgo/p/11602872.html
Copyright © 2011-2022 走看看