这里给出的模型是我没有进行参数调优的,所以效果不是最终的理想状态。
#%%
! pip install tushare
#%% md
https://mp.weixin.qq.com/s?__biz=Mzg4MDE3OTA5NA==&mid=2247491137&idx=1&sn=9506137b0ba2f1b59fadae117aaa97dd&source=41#wechat_redirect
x = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1, activation= 'sigmoid'))(x)
#%%
# 导入包
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import tushare as ts
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
#根据你的特征输入的大小调节
#参数设置/parameter setting
timesteps = seq_length = 20 #时间窗/window length
data_dim = 7 #输入数据维度/dimension of input data
output_dim = 1 #输出数据维度/dimension of output data
ts_code = '000001.SZ'
start_date = '20120101'
end_date = '20200301'
# A1
# 设置显示的最大列、宽等参数,消掉打印不完全中间的省略号
# pd.set_option('display.max_columns', 1000)
# pd.set_option('display.width', 1000)
# pd.set_option('display.max_colwidth', 1000)
# pd.set_option('display.height', 1000)
#显示所有列
pd.set_option('display.max_columns', None)
#显示所有行
pd.set_option('display.max_rows', None)
#数据准备/data preparation
#变量选取Open,High,Low,Close,Volume等
pro = ts.pro_api('XXXXXXXXXXXXXXXXXXXXXXXXXXXX') #token可以在新版tushare的网站上找到
stock_data = pro.query('daily',ts_code = '000001.SZ', start_date = '20120101', end_date = '20200301')
stock_data = stock_data[::-1] #倒序,使日期靠前的排在前面
print(stock_data.head(5))
stock_data.reset_index(drop=True, inplace=True) #把每行的索引改为“0、1、2……”
xy = stock_data[['open','close','high','low','vol','pct_chg','amount']] #选取需要的features
xy = np.array(xy.values) #转为array
#%% md
开始切分
#%%
#切分训练集合测试集/split to train and testing
train_size = int(len(xy) * 0.7) #训练集长度
test_size = len(xy) - train_size #测试集长度
xy_train, xy_test = np.array(xy[0:train_size]),np.array(xy[train_size:len(xy)]) #划分训练集、测试集
scaler = MinMaxScaler()
xy_train_new = scaler.fit_transform(xy_train) #预处理,按列操作,每列最小值为0,最大值为1
x_new = xy_train_new[:,0:] #features
y_new = xy_train_new[:,1] * 10 #labels,适当放大方便训练
x = x_new
y = y_new
dataX = []
dataY = []
for i in range(0, len(y) - seq_length):
_x = x[i:i + seq_length]
_y = y[i + seq_length] # Next close price
# print(_x, "->", _y)
dataX.append(_x)
dataY.append(_y)
#处理数据shape,准备进入神经网络层
x_real = np.vstack(dataX).reshape(-1,seq_length,data_dim)
y_real= np.vstack(dataY).reshape(-1,output_dim)
print(x_real.shape)
print(y_real.shape)
dataX = x_real
dataY = y_real
trainX, trainY = dataX, dataY
xy_test_new = scaler.transform(xy_test) #使用训练集的scaler预处理测试集的数据
x_new = xy_test_new[:,0:]
y_new = xy_test_new[:,1] * 10
x = x_new
y = y_new
dataX = []
dataY = []
for i in range(0, len(y) - seq_length):
_x = x[i:i + seq_length]
_y = y[i + seq_length] # Next price change
# print(_x, "->", _y)
dataX.append(_x)
dataY.append(_y)
#处理数据shape,准备进入神经网络层
x_real = np.vstack(dataX).reshape(-1,seq_length,data_dim)
y_real= np.vstack(dataY).reshape(-1,output_dim)
print(x_real.shape)
print(y_real.shape)
dataX = x_real
dataY = y_real
testX, testY = dataX, dataY
# print('训练集第一个窗口数据')
# print(trainX[0])
# print('训练集第一个回归数据')
# print(trainY[0])
# print('验证集第一个窗口数据')
# print(testX[0])
# print('验证集第一个回归数据')
# print(testY[0])
#%% md
建立模型
#%%
# B1
import os
import tensorflow as tf
inputs = tf.keras.Input(shape=(seq_length, data_dim))
# x = tf.keras.layers.Dense(units=10,activation='relu')(i)
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=50,return_sequences=True))(inputs)
# x = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1, activation= 'sigmoid'))
x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=50))(x)
# x = tf.keras.layers.Dropout(0.2)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(20)(x)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(inputs=inputs, outputs=output)
#%% md
模型训练
#%%
import os
import time
log_dir= './drive/My Drive/callsbacks_b1_chun'
if not os.path.exists(log_dir):
os.mkdir(log_dir)
output_model_file = os.path.join(log_dir,"best2.h5")
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir)
]
# opt = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(optimizer='adam', loss='mean_squared_error',metrics=['mae','mse'])
# model.compile(optimizer='rmsprop', loss='mae',metrics=['mae','mse'])
print(model.summary())
start = time.time()
# history = model.fit(trainX, trainY, batch_size=256, epochs=200, verbose=2, validation_split=0.1)
history = model.fit(trainX, trainY, batch_size=256, epochs=200, verbose=2, validation_split=0.1,callbacks=callbacks)
elapsed = (time.time() - start)
print(elapsed)
#%% md
模型的评估
#%%
# Plot training & validation loss values绘制训练和验证集的损失函数
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Test'], loc='upper left')
plt.show()
#%% md
训练样本的评估
#%%
trainPredict2 = model.predict(trainX) #查看⚠️训练结果
# trainPredict2_2 = scaler.inverse_transform(trainPredict2)
# trainY2=scaler.inverse_transform(trainY)
trainPredict2_2 = trainPredict2 / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
trainY2 = trainY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
#下面画出收盘价走势
plt.subplots(figsize=(16, 6))
# ax.plot(trainY2,color='blue')
# ax.plot(trainPredict2_2,color='orange')
plt.plot(trainY2, color='blue', label='REAL Stock Price')
plt.plot(trainPredict2_2, color='orange', label='PREDICT Stock Price')
plt.title(' TRAIN CLOSE PRICE')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
#%%
plt.figure(figsize=(8,8)) #画布大小
plt.xlim((5,30)) #x坐标范围
plt.ylim((5,30)) #y坐标范围
plt.scatter(trainY2, trainPredict2_2) #理想情况下散点应该分布在斜率为1的直线周围
plt.ylabel('prediction')
plt.xlabel('label')
print('训练平均误差:',np.mean((trainPredict2_2 - trainY2) / trainY2 * 100)) #平均误差(%)
print('训练最大误差:',np.max((trainPredict2_2 - trainY2) / trainY2 * 100)) #最大误差(%)
print('训练最小误差:',np.min((trainPredict2_2 - trainY2) / trainY2 * 100)) #最小误差(%)
#计算误差小于5%的比例
count = 0
for i in range(len(trainY2)):
if abs(trainPredict2_2[i] - trainY2[i]) / trainY2[i] * 100 <= 5:
count += 1
count = count / len(trainY2) * 100
print('训练误差小于5%的比例:',count)
#%% md
测试样本的评估
#%%
testPredict2 = model.predict(testX) #查看测试结果
testPredict2_2 = testPredict2 / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
testY2 = testY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
# ⚠️⚠️⚠️因为本身是模型预测趋势,而对于转折点对预测还是不高,所以在验证集上表现不是特别好⚠️蓝线和黄线是独立对没有联系对,也就是说蓝线是我们上帝视角提前获取对未来数据,而黄线是模型自己拟合出来对数据
print(model.evaluate(testX, testPredict2_2))
#下面画出收盘价走势
fig, ax = plt.subplots(figsize=(16, 6))
# ax.plot(testY2,color='blue')
# ax.plot(testPredict2_2,color='orange')
# Visualising the results 使用Matplotlib将预测股价和实际股价的结果可视化。
plt.plot(testY2, color='blue', label='REAL Stock Price')
plt.plot(testPredict2_2, color='orange', label='PREDICT Stock Price')
plt.title('TEST CLOSE PRICE')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
#%%
plt.figure(figsize=(8,8)) #画布大小
plt.xlim((5,20)) #x坐标范围
plt.ylim((5,20)) #y坐标范围
plt.scatter(testY2, testPredict2_2) #理想情况下散点应该分布在斜率为1的直线周围
plt.ylabel('prediction')
plt.xlabel('label')
print('测试平均误差:',np.mean((testPredict2_2 - testY2) / testY2 * 100)) #平均误差(%)
print('测试最大误差:',np.max((testPredict2_2 - testY2) / testY2 * 100)) #最大误差(%)
print('测试最小误差:',np.min((testPredict2_2 - testY2) / testY2 * 100)) #最小误差(%)
#计算误差小于5%的比例
count = 0
for i in range(len(testY2)):
if abs(testPredict2_2[i] - testY2[i]) / testY2[i] * 100 <= 5:
count += 1
count = count / len(testY2) * 100
print('测试误差小于5%的比例:',count)
#%% md
对模型整体训练对评估
#%%
# 模型的评估loss的值越低说明模型拟合的效果越好,LSTM模型主要采用RMSE作为评价标准
from sklearn.metrics import mean_squared_error,mean_absolute_error,r2_score
from math import sqrt
#回归评价指标
# calculate MSE 均方误差
mse=mean_squared_error(testY2,testPredict2_2)
# calculate RMSE 均方根误差
rmse = sqrt(mean_squared_error(testY2, testPredict2_2))
#calculate MAE 平均绝对误差
mae=mean_absolute_error(testY2,testPredict2_2)
#calculate R square
r_square=r2_score(testY2,testPredict2_2)
print('均方误差mse: %.6f' % mse)
print('均方根误差rmse: %.6f' % rmse)
print('平均绝对误差mae: %.6f' % mae)
print('R_square: %.6f' % r_square)
#%% md
股票评价
#%%
#计算对转折点的预测正确率
correct = np.zeros(len(testPredict2_2))
for i in range(1, len(testPredict2_2)):
if np.sign(testPredict2_2[i] - testPredict2_2[i-1]) == np.sign(testY2[i] - testY2[i-1]): #如果对涨或跌的判断准确,这里用正负符号判断###########################################################
correct[i] = 1 #就加1
accuracy = np.sum(correct) / len(correct) * 100
print('对转折点的预测正确率:',accuracy)
#如果对明天的预测价格高于今天的收盘价,就买进并持有一天,计算能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testPredict2_2[i] >= testY2[i-1]:
count = count + (testY2[i] - testY2[i-1])
print('
如果对明天的预测价格高于今天的收盘价,就买进并持有一天,能挣{}元'.format(count))
#如果对明天的预测价格高于今天的收盘价,就买进并持有十一天,计算能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testPredict2_2[i] >= testY2[i-1]:
if i+10 <len(testPredict2_2):
count = count + (testY2[i+10] - testY2[i-1])
print('如果对明天的预测价格高于今天的收盘价,就买进并持有十一天,能挣{}元'.format(count))
#最理想的状况下,能挣多少钱
count = 0
for i in range(1, len(testPredict2_2)):
if testY2[i] >= testY2[i-1]:
count = count + (testY2[i] - testY2[i-1])
print('最理想的状况下,能挣{}元'.format(count))
#%%
#如果对明天的预测价格高于今天的收盘价,就买进并持有n天,计算能挣多少钱
print(len(testPredict2_2))
maxmoneyday = 0
maxmoney = 0
for n in range(30):
count = 0
for i in range(1, len(testPredict2_2)):
# for n in range(len(testPredict2_2)-i):
# print(n)
if testPredict2_2[i] >= testY2[i-1]:
if i+n <len(testPredict2_2):
count = count + (testY2[i+n] - testY2[i-1])
print('在所有的这一段时间中,如果明天的预测价格高于今天的收盘价,就买进一股并持有{}天后卖出,总共能挣{}元'.format(n+1,count))
if maxmoney <count:
maxmoney = count
maxmoneyday = n+1
print('最佳的固定持有相同天数的方式投资赚了{},持有{}天'.format(maxmoney,maxmoneyday))
#%%
# 这里是导出可视化对界面
%reload_ext tensorboard
%tensorboard --logdir './drive/My Drive/callsbacks_b1_chun'
#%%
# 保存模型
# tf.saved_model.save(model, "Bilstm七输入0408.h5")
#%%
# 载入模型
# model = tf.saved_model.load('Bilstm七输入0402.h5')
#%% md
日线行情输出参数
名称 类型 描述
ts_code str 股票代码
trade_date str 交易日期
open float 开盘价
high float 最高价
low float 最低价
close float 收盘价
pre_close float 昨收价
change float 涨跌额
pct_chg float 涨跌幅 (未复权,如果是复权请用 通用行情接口 )
vol float 成交量 (手)
amount float 成交额 (千元)
---
每日指标输出参数
名称 类型 描述
ts_code str TS股票代码
trade_date str 交易日期
close float 当日收盘价
turnover_rate float 换手率(%)
turnover_rate_f float 换手率(自由流通股)
volume_ratio float 量比
pe float 市盈率(总市值/净利润, 亏损的PE为空)
pe_ttm float 市盈率(TTM,亏损的PE为空)
pb float 市净率(总市值/净资产)
ps float 市销率
ps_ttm float 市销率(TTM)
dv_ratio float 股息率 (%)
dv_ttm float 股息率(TTM)(%)
total_share float 总股本 (万股)
float_share float 流通股本 (万股)
free_share float 自由流通股本 (万)
total_mv float 总市值 (万元)
circ_mv float 流通市值(万元)