lstm网络进行股票预测 pytorch
- generate_dataset.py
import tushare as ts
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
# 数据获取与处理
data_close = ts.get_k_data('000001', start='2018-01-01', index=True)['close'].values # 获取上证指数从20180101开始的收盘价的np.ndarray
data_close = data_close.astype('float32') # 转换数据类型
# 将价格标准化到0~1
# print(data_close[:5])
max_value = np.max(data_close)
min_value = np.min(data_close)
data_close = (data_close - min_value) / (max_value - min_value)
# print(data_close[:5])
DAYS_FOR_TRAIN = 10
BATCH_SIZE = 12
SPLIT_PRECENT = 0.7
def create_dataset(data, days_for_train=5) -> (np.array, np.array) :
dataset_x, dataset_y = [], []
for i in range(len(data)-days_for_train):
_x = data[i: i+days_for_train]
dataset_x.append(_x);
dataset_y.append(data[i+days_for_train])
return (np.array(dataset_x), np.array(dataset_y))
dataset_x, dataset_y = create_dataset(data_close, DAYS_FOR_TRAIN)
class Mydataset(Dataset):
def __init__(self, data, transform=None):
self.data_x = data[0];
self.data_y = data[1];
self.tranform = transform
def __getitem__(self, index):
x = self.data_x[index].reshape(DAYS_FOR_TRAIN, 1)
y = self.data_y[index]
if(self.tranform != None):
x = self.tranform(x)
return x, y
def __len__(self):
return len(self.data_x)
def split_data(dataset, split_precent=0.8):
dataset_x, dataset_y = dataset
train_size = int(len(dataset_x) * split_precent)
train_x = dataset_x[:train_size]
train_y = dataset_y[:train_size]
test_x = dataset_x[train_size:]
test_y = dataset_y[train_size:]
return [train_x, train_y], [test_x, test_y]
train_data, test_data = split_data([dataset_x, dataset_y], split_precent=SPLIT_PRECENT)
train_loader = DataLoader(dataset=Mydataset(train_data),
batch_size=BATCH_SIZE,
shuffle=True )
test_loader = DataLoader(dataset=Mydataset(test_data),
batch_size=1,
shuffle=True )
- neural_network_modeling.py
import torch
import torch.nn as nn
class lstm(nn.Module):
def __init__(self, input_size=1,hidden_size=32, output_size=1, seq_len=10, is_bidir=False, dropout_p=0):
super(lstm, self).__init__();
# batch_size, seq_len, input_size(embedding_size)
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.seq_len = seq_len
self.is_bidir = is_bidir
self.dropout_p = dropout_p
self.rnn = nn.LSTM(self.input_size, self.hidden_size,
num_layers=2,
batch_first=True,
dropout=self.dropout_p,
bidirectional=self.is_bidir)
self.fc_input_size = 2*self.hidden_size if self.is_bidir else self.hidden_size
self.dropout = nn.Dropout(p=self.dropout_p)
self.linear = nn.Linear(self.fc_input_size , self.output_size)
def forward(self, x):
# x: [batch_size, seq_len, input_size]
output, (hidden, cell) = self.rnn(x)
batch_size, seq_len, hidden_size = output.shape
output = output.view(batch_size, seq_len, hidden_size)
output = self.linear(output)
output = self.dropout(output)
return output[:,-1,:].view(-1,1) # [batch_size, output_size]
- train.py
import numpy as np
import tushare as ts
import torch
import torch.nn as nn
from generate_dataset import train_loader
import neural_network_modeling as mnn
import os
use_gpu = torch.cuda.is_available()
EPOCH = 200
INPUT_SIZE = 1
HIDDEN_SIEZ = 8
OUTPUT_SIZE = 1
SEQ_LEN = 10
METHOD = "lstm3_epoch%d_input%d_hidden%d_output%d_seqLen%d" %(EPOCH, INPUT_SIZE, HIDDEN_SIEZ, OUTPUT_SIZE, SEQ_LEN)
model = mnn.lstm(INPUT_SIZE, HIDDEN_SIEZ, OUTPUT_SIZE, SEQ_LEN)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
if __name__ == "__main__":
loss_function = nn.MSELoss()
if (use_gpu):
model = model.cuda()
loss_function = loss_function.cuda()
for i in range(EPOCH):
total_loss = 0.0
for idx, (data, label) in enumerate(train_loader):
if (use_gpu):
data, label = data.cuda(),label.cuda()
# print(data.shape)
pred_y = model(data)
loss = loss_function(pred_y, label.unsqueeze(1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
if (i+1) % 20 == 0:
print('Epoch: {}, Loss:{:.5f}'.format(i+1, total_loss))
torch.save(model.state_dict(),'./models/%s.pth' % METHOD)
print("save model successfully!")
- predict.py
import os
import torch
import numpy as np
import matplotlib.pyplot as plt
from train import model, METHOD
from generate_dataset import test_loader
use_gpu = torch.cuda.is_available()
if os.path.exists('./models/%s.pth' % METHOD):
try:
model.load_state_dict(torch.load('./models/%s.pth' % METHOD))
except Exception as e:
print(e)
print("Parameters Error")
if use_gpu:
model= model.cuda()
model.eval() #test
preds = []
labels = []
for idx, (data, label) in enumerate(test_loader):
if (use_gpu):
data, label = data.cuda(), label.cuda()
pred_y = model(data)
preds.extend(pred_y.squeeze(1).tolist())
labels.extend(label.tolist())
print("preds:", preds[:5])
print("labels:", labels[:5])
mean_error = np.mean(np.abs(np.array(preds)- np.array(labels)))
print("mean_error: ", mean_error)
plt.plot(preds, 'r', label='prediction')
plt.plot(labels, 'b', label='real')
# plt.plot((train_size, train_size), (0, 1), 'g--') # 分割线 左边是训练数据 右边是测试数据的输出
plt.legend(loc='best')
plt.savefig('%s_result.png' % METHOD, format='png', dpi=200)
plt.close()
参考文献
[1] pytorch上手模板
[2] pytorch中LSTM的细节分析理解
[3] (PyTorch)使用 LSTM 预测时间序列(股票)