zoukankan      html  css  js  c++  java
  • lstm torch

    lstm网络进行股票预测 pytorch

    1. generate_dataset.py
    import tushare as ts
    import numpy as np
    import torch
    import torch.nn as nn
    from torch.utils.data import Dataset, DataLoader
    # 数据获取与处理
    data_close = ts.get_k_data('000001', start='2018-01-01', index=True)['close'].values  # 获取上证指数从20180101开始的收盘价的np.ndarray
    data_close = data_close.astype('float32')  # 转换数据类型
    # 将价格标准化到0~1
    # print(data_close[:5])
    max_value = np.max(data_close)
    min_value = np.min(data_close)
    data_close = (data_close - min_value) / (max_value - min_value)
    # print(data_close[:5])
    
    DAYS_FOR_TRAIN = 10
    BATCH_SIZE = 12
    SPLIT_PRECENT = 0.7
    
    def create_dataset(data, days_for_train=5) -> (np.array, np.array) :
        dataset_x, dataset_y = [], []
        for i in range(len(data)-days_for_train): 
            _x = data[i: i+days_for_train]
            dataset_x.append(_x);
            dataset_y.append(data[i+days_for_train])
        return (np.array(dataset_x), np.array(dataset_y))
    
    dataset_x, dataset_y = create_dataset(data_close, DAYS_FOR_TRAIN)
    
    class Mydataset(Dataset):
        def __init__(self, data, transform=None):
            self.data_x = data[0];
            self.data_y = data[1];
            self.tranform = transform
        
        def __getitem__(self, index):
            x = self.data_x[index].reshape(DAYS_FOR_TRAIN, 1)
            y = self.data_y[index]
            if(self.tranform != None):
                x = self.tranform(x)
            return x, y
    
        def __len__(self):
            return len(self.data_x)
    
    def split_data(dataset, split_precent=0.8):
        dataset_x, dataset_y = dataset
        train_size = int(len(dataset_x) * split_precent)
        train_x = dataset_x[:train_size]
        train_y = dataset_y[:train_size]
        test_x = dataset_x[train_size:]
        test_y = dataset_y[train_size:]
        return [train_x, train_y], [test_x, test_y]
    
    train_data, test_data = split_data([dataset_x, dataset_y], split_precent=SPLIT_PRECENT)
    
    train_loader = DataLoader(dataset=Mydataset(train_data), 
                            batch_size=BATCH_SIZE, 
                            shuffle=True )
    test_loader = DataLoader(dataset=Mydataset(test_data), 
                            batch_size=1, 
                            shuffle=True )
    
    1. neural_network_modeling.py
    import torch
    import torch.nn as nn
    class lstm(nn.Module):
        def __init__(self, input_size=1,hidden_size=32, output_size=1, seq_len=10, is_bidir=False, dropout_p=0):
            super(lstm, self).__init__();
            # batch_size, seq_len, input_size(embedding_size)
            self.input_size = input_size
            self.hidden_size = hidden_size
            self.output_size = output_size
            self.seq_len = seq_len
            self.is_bidir = is_bidir
            self.dropout_p = dropout_p
            self.rnn = nn.LSTM(self.input_size, self.hidden_size, 
                                num_layers=2, 
                                batch_first=True, 
                                dropout=self.dropout_p, 
                                bidirectional=self.is_bidir)
            self.fc_input_size = 2*self.hidden_size if self.is_bidir else self.hidden_size
            self.dropout = nn.Dropout(p=self.dropout_p)
            self.linear = nn.Linear(self.fc_input_size , self.output_size)
    
        def forward(self, x):
            # x: [batch_size, seq_len, input_size]
            output, (hidden, cell) = self.rnn(x)
            batch_size, seq_len, hidden_size = output.shape
            output = output.view(batch_size, seq_len, hidden_size)
            output = self.linear(output)
            output = self.dropout(output)
            return output[:,-1,:].view(-1,1) # [batch_size, output_size]
    
    1. train.py
    import numpy as np
    import tushare as ts
    import torch
    import torch.nn as nn
    from generate_dataset import train_loader
    import neural_network_modeling as mnn
    import os
    use_gpu = torch.cuda.is_available()
    
    EPOCH = 200
    INPUT_SIZE = 1
    HIDDEN_SIEZ = 8
    OUTPUT_SIZE = 1
    SEQ_LEN = 10
    
    METHOD = "lstm3_epoch%d_input%d_hidden%d_output%d_seqLen%d" %(EPOCH, INPUT_SIZE, HIDDEN_SIEZ, OUTPUT_SIZE, SEQ_LEN)
    
    model = mnn.lstm(INPUT_SIZE, HIDDEN_SIEZ, OUTPUT_SIZE, SEQ_LEN)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    if __name__ == "__main__":
        loss_function = nn.MSELoss()
        if (use_gpu):
            model = model.cuda()
            loss_function = loss_function.cuda()
        for i in range(EPOCH):
            total_loss = 0.0
            for idx, (data, label) in enumerate(train_loader):
                if (use_gpu):
                    data, label = data.cuda(),label.cuda()
                # print(data.shape)
                pred_y = model(data)
                loss = loss_function(pred_y, label.unsqueeze(1))
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.item() 
            if (i+1) % 20 == 0:
                print('Epoch: {}, Loss:{:.5f}'.format(i+1, total_loss))
    
        torch.save(model.state_dict(),'./models/%s.pth' % METHOD)
        print("save model successfully!")
    
    1. predict.py
    import os
    import torch
    import numpy as np 
    import matplotlib.pyplot as plt
    from train import model, METHOD
    from generate_dataset import test_loader
    
    use_gpu = torch.cuda.is_available()
    if os.path.exists('./models/%s.pth' % METHOD):
        try:
            model.load_state_dict(torch.load('./models/%s.pth' % METHOD))
        except Exception as e:
            print(e)
            print("Parameters Error")
    if use_gpu:
        model= model.cuda()
    model.eval() #test 
    preds = []
    labels = []
    for idx, (data, label) in enumerate(test_loader):
        if (use_gpu):
            data, label = data.cuda(), label.cuda()
        pred_y = model(data)
        preds.extend(pred_y.squeeze(1).tolist())
        labels.extend(label.tolist())
    
    print("preds:", preds[:5])
    print("labels:", labels[:5])
    mean_error = np.mean(np.abs(np.array(preds)- np.array(labels)))
    print("mean_error: ", mean_error)
    plt.plot(preds, 'r', label='prediction')
    plt.plot(labels, 'b', label='real')
    # plt.plot((train_size, train_size), (0, 1), 'g--')  # 分割线 左边是训练数据 右边是测试数据的输出
    plt.legend(loc='best')
    plt.savefig('%s_result.png' % METHOD, format='png', dpi=200)
    plt.close()
    

    参考文献
    [1] pytorch上手模板
    [2] pytorch中LSTM的细节分析理解
    [3] (PyTorch)使用 LSTM 预测时间序列(股票)

  • 相关阅读:
    element 步骤条steps 点击事件
    element-ui的rules中正则表达式
    从master分支创建自己的分支
    2.1 系统调用io实现原理
    2-3形参和实参
    2-2函数
    2-1.编译和链接
    linux高编信号-------setitimer()、getitimer()
    linux高编IO-------有限状态机编程原理(mycpy)
    linux高编线程-------线程同步-条件变量
  • 原文地址:https://www.cnblogs.com/lixyuan/p/12801286.html
Copyright © 2011-2022 走看看