Tushare接口数据请求:分当天、历史两种情况。通过flag标签控制
由于部分接口请求限制,采用offset限制每次请求数量,time.sleep控制请求间隔时间
部分数据才用drop_duplicates(keep=False)去除所有重复数据
数据利用pandas写入mysql数据库。
数据表字段设置,详见:
Tushare接口,详见:https://tushare.pro/document/2
# -*- coding: utf-8 -*- import pandas as pd import tushare as ts import time import datetime from sqlalchemy import create_engine import requests import json import datetime # ====================股票列表:stock_basic====================================================================================================================== def stockList(engine): print("-------------------------------------------") print("开始从Tushare接口获取股票列表数据") l_list = pro.stock_basic(list_status='L', fields='ts_code,symbol,name,area,list_status,list_date') p_list = pro.stock_basic(list_status='P', fields='ts_code,symbol,name,area,list_status,list_date') exist_list = pd.read_sql('select * from 股票列表', engine) total_list = pd.concat([l_list, p_list, exist_list], axis=0, sort=False) stock_list = total_list.drop_duplicates(subset=None, keep=False) stock_list.to_sql('all_stock', engine, if_exists='append', index=False) print(stock_list) print("本次存储股票列表数据%s条" % stock_list.shape[0]) # ====================指数日线行情:index_daily=================================================================================================================== def indexTrade(flag, engine, *args): # 单次最多请求8000条 print("-------------------------------------------") print("开始从Tushare接口获取指数行情数据") if flag == "Y": # 获取交易日数据 cur_df1 = pro.index_daily(ts_code='000001.SH', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 上证指数 cur_df2 = pro.index_daily(ts_code='399006.SZ', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 创业板指数 cur_df3 = pro.index_daily(ts_code='399106.SZ', trade_date=current, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 深证综指 idx_daily = pd.concat([cur_df1, cur_df2, cur_df3], axis=0) elif flag == "N": # 获取历史交易数据 his_df1 = pro.index_daily(ts_code='000001.SH', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 上证指数 his_df2 = pro.index_daily(ts_code='399006.SZ', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 创业板指数 his_df3 = pro.index_daily(ts_code='399106.SZ', start_date=start, end_date=end, fields='ts_code,trade_date,close,open,high,low,pre_close,pct_chg,amount') # 深证综指 idx_daily = pd.concat([his_df1, his_df2, his_df3], axis=0) idx_daily['amount'] = round(idx_daily['amount'].map(lambda x: x / 10), 2) idx_daily.to_sql('market_index', engine, if_exists='append', index=False) print(idx_daily) print("本次存储指数行情数据%s条" % idx_daily.shape[0]) # ====================日线行情:daily============================================================================================================================ def dailyData(flag, engine, *args): # 本接口是未复权行情,停牌期间不提供数据,每分钟内最多调取200次,每次4000条数据,交易日当天15点~16点之间更新 print("-------------------------------------------") print("开始从Tushare接口获取个股行情数据") # 创建空DataFrame daily = pd.DataFrame() if flag == "Y": # 获取交易日数据 for offset in range(0, 9000, 3000): cur_daily = pro.daily(trade_date=current, fields='ts_code,trade_date,open,high,low,close,pre_close,pct_chg,amount', offset=offset, limit=3000) daily = daily.append(cur_daily, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 360000, 3000): his_daily = pro.daily(start_date=start, end_date=end, fields='ts_code,trade_date,open,high,low,close,pre_close,pct_chg,amount', offset=offset, limit=3000) daily = daily.append(his_daily, ignore_index=True) daily['amount'] = daily['amount'].map(lambda x: round(x / 10, 2)) daily.to_sql('stock_tradeing_data', engine, if_exists='append', index=False) print(daily) print("本次存储个股行情数据%s条" % daily.shape[0]) # ====================每日指标:daily_basic====================================================================================================================== def dailyBasic(flag, engine, *args): # 交易日每日15点~17点之间更新 print("-------------------------------------------") print("开始从Tushare接口获取个股基础行情指标数据") # 创建空的dataframe daily_basic = pd.DataFrame() if flag == "Y": # 获取交易日数据 for offset in range(0, 9000, 3000): cur_basic = pro.daily_basic(trade_date=current, fields='ts_code,trade_date,turnover_rate_f,volume_ratio,pe_ttm,pb,ps_ttm,dv_ttm,total_share,float_share,total_mv,circ_mv', offset=offset, limit=3000) daily_basic = daily_basic.append(cur_basic, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 360000, 3000): his_basic = pro.daily_basic(start_date=start, end_date=end, fields='ts_code,trade_date,turnover_rate_f,volume_ratio,pe_ttm,pb,ps_ttm,dv_ttm,total_share,float_share,total_mv,circ_mv', offset=offset, limit=3000) daily_basic = daily_basic.append(his_basic, ignore_index=True) daily_basic.to_sql('basic_stock_index', engine, if_exists='append', index=False) print(daily_basic) print("本次存储个股基础行情指标数据%s条" % daily_basic.shape[0]) # ====================沪深港通资金流向:moneyflow_hsgt========================================================================================================== def hsgtMoneyflow(flag, engine, *args): # 每次最多返回300条记录,总量不限制 print("-------------------------------------------") print("开始从Tushare接口获取沪深港通资金数据") # 创建空DataFrame hsgt = pd.DataFrame() if flag == "Y": # 获取交易日数据 cur_hsgt = pro.moneyflow_hsgt(trade_date=current, fields='trade_date,hgt,sgt,north_money') hsgt = hsgt.append(cur_hsgt, ignore_index=True) elif flag == "N": # 获取历史数据 for offset in range(0, 900, 300): his_hsgt = pro.moneyflow_hsgt(start_date=start, end_date=end, fields='trade_date,hgt,sgt,north_money', offset=offset, limit=300) hsgt = hsgt.append(his_hsgt, ignore_index=True) hsgt['hgt'] = hsgt['hgt'].map(lambda x: round(x * 100, 1)) hsgt['sgt'] = hsgt['sgt'].map(lambda x: round(x * 100, 1)) hsgt['north_money'] = hsgt['north_money'].map(lambda x: round(x * 100, 1)) hsgt.to_sql('north_fund', engine, if_exists='append', index=False) print(hsgt) print("本次存储沪深港股通资金数据%s条" % hsgt.shape[0]) # ====================个股资金流向:moneyflow=================================================================================================================== def stcokMoneyflow(flag, engine, *args): # 单次最大提取4000行记录,总量不限制 print("-------------------------------------------") print("开始从Tushare接口获取个股大单资金数据") # 创建空的dataframe dd_money = pd.DataFrame() if flag == "Y": # 获取交易日数据 for offset in range(0, 9000, 3000): cur_money = pro.moneyflow(trade_date=current, offset=offset, limit=3000) dd_money = dd_money.append(cur_money, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 360000, 3000): his_money = pro.moneyflow(start_date=start, end_date=end, offset=offset, limit=3000) dd_money = dd_money.append(his_money, ignore_index=True) dd_money.to_sql('big_stock_fund', engine, if_exists='append', index=False) print(dd_money) print("本次存储个股大单资金数据%s条" % dd_money.shape[0]) # ====================大宗交易:block_trade===================================================================================================================== def blockTrade(flag, engine, *args): # 单次最大1000条,总量不限制 print("-------------------------------------------") print("开始从Tushare接口获取大宗交易数据") block_trade = pd.DataFrame() if flag == "Y": # 获取交易日数据 cur_block = pro.block_trade(trade_date=current, fields='ts_code,trade_date,price,amount,buyer,seller') block_trade = block_trade.append(cur_block, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 9000, 1000): his_block = pro.block_trade(start_date=start, end_date=end, fields='ts_code,trade_date,price,amount,buyer,seller', offset=offset, limit=1000) block_trade = block_trade.append(his_block, ignore_index=True) block_trade.to_sql('bulk_trading', engine, if_exists='append', index=False) print(block_trade) print("本次存储大宗交易数据%s条" % block_trade.shape[0]) # ====================每日涨跌停统计:limit_list================================================================================================================= def zdCount(flag, engine, *args): # 单次最大1000,总量不限制 print("-------------------------------------------") print("开始从Tushare接口获取涨跌停统计数据") limit = pd.DataFrame() if flag == "Y": # 获取交易日数据 for offset in range(0, 3000, 1000): cur_limit = pro.limit_list(trade_date=current, fields='trade_date,ts_code,fc_ratio,fd_amount,first_time,last_time,open_times,strth,limit', offset=offset, limit=1000) limit = limit.append(cur_limit, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 90000, 1000): his_limit = pro.limit_list(start_date=start, end_date=end, fields='trade_date,ts_code,trade_date,fc_ratio,fd_amount,first_time,last_time,open_times,strth,limit', offset=offset, limit=1000) limit = limit.append(his_limit, ignore_index=True) limit.to_sql('up_to_down_detail', engine, if_exists='append', index=False) print(limit) print("本次存储涨跌停统计数据%s条" % limit.shape[0]) # ====================Shibor利率数据:shibor==================================================================================================================== def shiborRate(flag, engine, *args): # 单次最大2000,总量不限制 print("-------------------------------------------") print("开始从Tushare接口获取shibor利率数据") shibor = pd.DataFrame() if flag == "Y": # 获取交易日数据 cur_shibor = pro.shibor(date=current, fields='date,on,1w,3m') shibor = shibor.append(cur_shibor, ignore_index=True) elif flag == "N": # 获取历史交易数据 his_shibor = pro.shibor(start_date=start, end_date=end, fields='date,on,1w,3m') shibor = shibor.append(his_shibor, ignore_index=True) shibor.to_sql('shibor_rate', engine, if_exists='append', index=False) print(shibor) print("本次存储shibor利率数据%s条" % shibor.shape[0]) # ====================Tushare股东人数====================================================================================================================== def holderNumber(flag, engine, *args): # 单次最大3000,总量不限制,每分钟调取100次 print("-------------------------------------------") print("开始从Tushare接口获取股东人数数据") count = 0 holdernum = pd.DataFrame() if flag == "Y": # 获取交易日数据 for offset in range(0, 0000, 3000): hdrm = pro.stk_holdernumber(start_date=current, end_date=current, fields='ts_code,end_date,holder_num', offset=offset, limit=3000) holdernum = holdernum.append(hdrm, ignore_index=True) elif flag == "N": # 获取历史交易数据 for offset in range(0, 90000, 3000): hdrm = pro.stk_holdernumber(start_date=start, end_date=end, fields='ts_code,end_date,holder_num', offset=offset, limit=3000) holdernum = holdernum.append(hdrm, ignore_index=True) holdernum.to_sql('shareholder_number', engine, if_exists='append', index=False) print(holdernum) print("获取股东人数数据%s条" % holdernum.shape[0]) # ====================主函数==================================================================================================================================== if __name__ == '__main__': print("Tushare接口程序开始执行") print("-------------------------------------------") begin = time.time() # 初始化tushare.pro接口,创建mysql数据库引擎 pro = ts.pro_api('ac16b470869c5d82db5033ae9288f77b282d2b5519507d6d2c72fdd7') engine = create_engine('mysql://root:123456@127.0.0.1/quant?charset=utf8') # 判断是否要获取交易日数据 flag = input("是否获取交易日数据,选择Y或N:") if flag == "Y": print("开始从Tushare接口获取当前交易日数据") # 获取当天日期 current = time.strftime("%Y%m%d", time.localtime()) stockList(engine) # 获取股票列表 indexTrade(flag, engine, current) # 指数行情数据 dailyData(flag, engine, current) # 个股行情数据 dailyBasic(flag, engine, current) # 个股基础行情指标 hsgtMoneyflow(flag, engine, current) # 北向资金 stcokMoneyflow(flag, engine, current) # 个股大单资金 blockTrade(flag, engine, current) # 大宗交易 zdCount(flag, engine, current) # 涨跌停统计 shiborRate(flag, engine, current) # shibor利率 holderNumber(flag, engine, current) # 股东人数 elif flag == "N": print("开始从Tushare接口获取历史数据") # 获取历史起始日期 start = input("时间格式如:19491001,请输入起始日期:") end = input("时间格式如:19491001,请输入截止日期:") stockList(engine) # 获取股票列表 indexTrade(flag, engine, start, end) # 指数行情数据 dailyData(flag, engine, start, end) # 个股行情数据 dailyBasic(flag, engine, start, end) # 个股基础行情指标 hsgtMoneyflow(flag, engine, start, end) # 北向资金 stcokMoneyflow(flag, engine, start, end) # 个股大单资金 blockTrade(flag, engine, start, end) # 大宗交易 zdCount(flag, engine, start, end) # 涨跌停统计 shiborRate(flag, engine, start, end) # shibor利率 holderNumber(flag, engine, start, end) # 股东人数 ed = time.time() print('本次程序共执行%0.2f秒.' % ((ed - begin))) print("Tushare接口程序执行完成")