tushare是一个开放的,免费的金融数据平台,包含沪深股票数据,指数数据,基金数据,期货数据,期权数据,债券数据,外汇数据,港股数据,行业经济数据,宏观经济数据以及新闻快讯等特色数据。其中以沪深股票数据最为丰富,包含了有:
基本包含了沪深股票全部常用数据。
tushare 目前提供了四种获取数据的方式,分别为 http, Python SDK, Matlab SDK, R SDK。
这里介绍如何用Python SDK获取股票的每日指标数据。
(1)注册tushare用户,获取 token
注册网页链接为 https://tushare.pro/register?reg=369571
注册完成后可以在个人主页的接口TOKEN下看到自己的token
(2)安装 tushare
个人使用的python开发的IDE为 pycharm
pip install tushare -i https://pypi.tuna.tsinghua.edu.cn/simple
tushare依赖了numpy,pandas等一些库,安装完之后可能需要根据报错提示安装对应的库
(4)安装Elasticsearch
pip3 install elasticsearch -i https://pypi.tuna.tsinghua.edu.cn/simple
(5)调用tushare
这里把调用tushare的函数都封装在了一个文件里面,代码如下
import datetime import time import numpy as np import tushare as ts ts.set_token('b15148f5ca285bd0e85bbc3f659daefff549ade3bba06fae6a037f03') pro = ts.pro_api() # 股票列表 def get_all_stock(): stocks = pro.stock_basic(exchange='', list_status='L', fields='ts_code,symbol,name,fullname,area,industry,list_date') return stocks # 每日指标 def get_daily_basic(share_code, start_date, end_date): while 1: try: df = pro.daily_basic(ts_code=share_code, start_date=start_date, end_date=end_date, timeout=60) return df except: print("get_daily_basic 获取失败,参数为:", share_code, start_date, end_date) time.sleep(0.5)
stock_basic接口用于获取股票列表,本接口文档网址:https://tushare.pro/document/2?doc_id=25
daily_basic接口用于获取每日指标,网址:https://tushare.pro/document/2?doc_id=32。 这里用一个循环来获取,因为tushare对每分钟调用次数有限制(这也是为啥我要把数据保存到本地),超过次数限制时会报错,所以我这里用一个except获取异常,等待0.5s后重新再试。
(6)保存到elasticsearch
保存到elasticsearch之前当然需要本机已经启动了elasticsearch。
关于elasticsearch的安装配置见我的另一篇博客https://www.cnblogs.com/betterwgo/p/11240821.html
python 调用 tushare,并将数据保存到elasticsearch的代码如下:
# 每日指标 import configparser import logging import numpy as np from elasticsearch import Elasticsearch from elasticsearch import helpers import stock_parser as parser logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) handler = logging.FileHandler("log_daily_basic.txt") handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.info("Start print log") config = configparser.ConfigParser() config.read("config.ini") latest_daily_basic_tscode = config.get("daily", "latest_daily_basic_tscode") es = Elasticsearch([{'host': '127.0.0.1', 'port': 9200}]) # ts_code str TS股票代码 # trade_date str 交易日期 # close float 当日收盘价 # turnover_rate float 换手率(%) # turnover_rate_f float 换手率(自由流通股) # volume_ratio float 量比 # pe float 市盈率(总市值/净利润) # pe_ttm float 市盈率(TTM) # pb float 市净率(总市值/净资产) # ps float 市销率 # ps_ttm float 市销率(TTM) # total_share float 总股本 (万股) # float_share float 流通股本 (万股) # free_share float 自由流通股本 (万) # total_mv float 总市值 (万元) # circ_mv float 流通市值(万元) body = { "mappings": { "properties": { "ts_code": { "type": "keyword" }, "trade_date": { "type": "integer" }, "close": { "type": "float" }, "turnover_rate": { "type": "float" }, "turnover_rate_f": { "type": "float" }, "volume_ratio": { "type": "float" }, "pe": { "type": "float" }, "pe_ttm": { "type": "float" }, "pb": { "type": "float" }, "ps": { "type": "float" }, "ps_ttm": { "type": "float" }, "total_share": { "type": "float" }, "float_share": { "type": "float" }, "free_share": { "type": "float" }, "total_mv": { "type": "float" }, "circ_mv": { "type": "float" } } } } index = 'index_daily_basic' es.indices.create(index=index, body=body, ignore=400) def check_float(item, x_name): x = item[x_name] if x is None or np.isnan(x): x = 0.0 logger.info("%s %s %s is None or nan" % (item['ts_code'], item['trade_date'], x_name)) return x def es_insert_daily_basic(df): actions = [] for i in range(len(df)): df_item = df.iloc[i] tscode = df_item['ts_code'] trade_date = int(df_item['trade_date']) x = tscode.split('.', 1) col_name = x[1] + x[0] _id = col_name + df_item['trade_date'] close = check_float(df_item, 'close') turnover_rate = check_float(df_item, 'turnover_rate') turnover_rate_f = check_float(df_item, 'turnover_rate_f') volume_ratio = check_float(df_item, 'volume_ratio') pe = check_float(df_item, 'pe') pe_ttm = check_float(df_item, 'pe_ttm') pb = check_float(df_item, 'pb') ps = check_float(df_item, 'ps') ps_ttm = check_float(df_item, 'ps_ttm') total_share = check_float(df_item, 'total_share') float_share = check_float(df_item, 'float_share') free_share = check_float(df_item, 'free_share') total_mv = check_float(df_item, 'total_mv') circ_mv = check_float(df_item, 'circ_mv') action = { "_index": index, "_type": "_doc", "_id": _id, "_source": { "ts_code": ts_code, "trade_date": trade_date, "close": close, "turnover_rate": turnover_rate, "turnover_rate_f": turnover_rate_f, "volume_ratio": volume_ratio, "pe": pe, "pe_ttm": pe_ttm, "pb": pb, "ps": ps, "ps_ttm": ps_ttm, "total_share": total_share, "float_share": float_share, "free_share": free_share, "total_mv": total_mv, "circ_mv": circ_mv } } # 形成一个长度与查询结果数量相等的列表 actions.append(action) if i % 1000 == 0 or i == (len(df) - 1): helpers.bulk(client=es, actions=actions) actions.clear() actions.clear() def update_latest_daily_basic_tscode(tscode): config.set("daily", "latest_daily_basic_tscode", tscode) # write to file with open("config.ini", "w+") as f: config.write(f) # 更新单只股票 def update_daily_basic(tscode, start_date, end_date): df = parser.get_daily_basic(tscode, start_date, end_date) es_insert_daily_basic(df) return len(df) if __name__ == "__main__": # 获取全部上市股票代码 stocks = parser.get_all_stock() bIn = True for i in range(len(stocks)): stock = stocks.iloc[i] ts_code = stock['ts_code'] if latest_daily_basic_tscode == ts_code: bIn = False if not bIn: count = update_daily_basic(ts_code, '20000101', '') print(i, ts_code, count) update_latest_daily_basic_tscode(ts_code) else: print(i, ts_code)
这里日志用的logging,没具体研究一股脑全搬上来了,反正我只需要打印个错误日志就行。
然后还用了一个 configparser 来解析 ini 配置文件,config.ini文件中配置如下信息:
[daily]
latest_daily_basic_tscode = 000001.SZ
配置文件的目的是再程序中断后重新启动不用从第一个开始,直接从配置文件中的开始。获取股票列表的接口的第一条是 000001.SZ,所以这里初始配置为它,这里其实可以优化一下。
数据保存到elasticsearch用的是 helps中的bulk函数,做批量索引
看一下保存的结果情况:
tushare注册: https://tushare.pro/register?reg=369571