pytorch實現LSTM對股票的預測
參考:https://blog.csdn.net/mary19831/article/details/129570030
輸入:前十天的收盤價 x: [bathc_size, seq_len, input_size] = [bs, 10, 1]
輸出:下一天的收盤價 y: [batch_size, 1]
原博主給的代碼輸入特征是10維的,也就是認為 seq_len = 1, input_size = 10。
我這里做了修改,以便輸入多維的特征(比如前10天的 收盤價,成交量... )
結果
左邊是所有數據的預測結果(中間有條綠色的分割線 左邊是訓練數據 右邊是測試數據的輸出),右圖是僅在測試集上預測的結果,可以看出對于測試集的預測,該模型是滯后的。

代碼
import matplotlib.pyplot as plt
import numpy as np
import tushare as ts
import pandas as pd
import torch
from torch import nn
import datetime
from torch.utils.data import DataLoader, Dataset
import time
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class LSTM_Regression(nn.Module):
def __init__(self, input_size, hidden_size, output_size=1, num_layers=2):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, _x):
x, _ = self.lstm(_x) # (batch_size, seq_len, input_size) -> (batch_size, seq_len, hidden_size)
x = x[:, -1, :]
x = self.fc(x)
return x
def create_dataset(data, days_for_train=5):
"""
根據給定的序列data,生成數據集
數據集分為輸入和輸出,每一個輸入的長度為days_for_train,每一個輸出的長度為1。
也就是說用days_for_train天的數據,對應下一天的數據。
若給定序列的長度為d,將輸出長度為(d-days_for_train+1)個輸入/輸出對
"""
dataset_x, dataset_y = [], []
for i in range(len(data) - days_for_train):
_x = data[i:(i + days_for_train)]
dataset_x.append(_x)
dataset_y.append(data[i + days_for_train])
return (np.array(dataset_x), np.array(dataset_y))
if __name__ == '__main__':
days_for_train = 10
batch_size = 1
# 讀取數據
t0 = time.time()
data_close = ts.get_k_data('000001', start='2019-01-01', index=True)['close'] # 取上證指數的收盤價
data_close.to_csv('000001.csv', index=False) #將下載的數據轉存為.csv格式保存
data_close = pd.read_csv('000001.csv') # 讀取文件
data_close = data_close.astype('float32').values # 轉換數據類型
# 將價格標準化到0~1
max_value = np.max(data_close)
min_value = np.min(data_close)
data_close = (data_close - min_value) / (max_value - min_value)
# 劃分訓練集和測試集,70%作為訓練集
dataset_x, dataset_y = create_dataset(data_close, days_for_train)
train_size = int(len(dataset_x) * 0.7)
train_x = dataset_x[:train_size]
train_y = dataset_y[:train_size]
test_x = dataset_x[train_size:]
test_y = dataset_y[train_size:]
# 轉為pytorch的tensor對象
train_x = torch.from_numpy(train_x).to(device)
train_y = torch.from_numpy(train_y).to(device)
test_x = torch.from_numpy(test_x).to(device)
test_y = torch.from_numpy(test_y).to(device)
print("train_x.shape: ", train_x.shape)
print("train_y.shape: ", train_y.shape)
print("test_x.shape: ", test_x.shape)
print("test_y.shape: ", test_y.shape)
model = LSTM_Regression(input_size=1, hidden_size=8, output_size=1, num_layers=2) # 導入模型并設置模型的參數輸入輸出層、隱藏層等
model = model.to(device)
train_loss = []
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)
# for i in range(60):
# for b in range(0, len(train_x), batch_size):
# batch_x = train_x[b:b+batch_size]
# batch_y = train_y[b:b+batch_size]
# # 前向傳播
# outputs = model(batch_x)
# loss = loss_function(outputs, batch_y)
# loss.backward()
# optimizer.step()
# optimizer.zero_grad()
# train_loss.append(loss.item())
# # 將訓練過程的損失值寫入文檔保存,并在終端打印出來
# with open('log.txt', 'a+') as f:
# f.write('{} - {}\n'.format(i + 1, loss.item()))
# if (i + 1) % 1 == 0:
# print('Epoch: {}, Loss:{:.5f}'.format(i + 1, loss.item()))
# # 畫loss曲線
# plt.figure()
# plt.plot(train_loss, 'b', label='loss')
# plt.title("Train_Loss_Curve")
# plt.ylabel('train_loss')
# plt.xlabel('epoch_num')
# plt.savefig('loss.png', format='png', dpi=200)
# plt.close()
# torch.save(model.state_dict(), 'model_params.pkl') # 可以保存模型的參數供未來使用
# t1 = time.time()
# T = t1 - t0
# print('The training time took %.2f' % (T / 60) + ' mins.')
# for test
model = model.eval() # 轉換成評估模式
model.load_state_dict(torch.load('model_params.pkl')) # 加載訓練好的模型參數
pred_test = model(test_x)
pred_test = pred_test.view(-1).data.cpu().numpy()
test_y = test_y.data.cpu().numpy()
assert len(pred_test) == len(test_y)
plt.plot(pred_test, 'r', label='prediction')
plt.plot(test_y, 'b', label='real')
plt.legend(loc='best')
plt.savefig('test.png', format='png', dpi=200)
plt.close()
# plot for all data
dataset_x = dataset_x.reshape(-1, days_for_train, 1)
dataset_x = torch.from_numpy(dataset_x).to(device)
pred_test = model(dataset_x)
pred_test = pred_test.view(-1).data.cpu().numpy()
pred_test = np.concatenate((np.zeros(days_for_train), pred_test)) # 填充0 使長度相同
assert len(pred_test) == len(data_close)
plt.plot(pred_test, 'r', label='prediction')
plt.plot(data_close, 'b', label='real')
plt.plot((train_size, train_size), (0, 1), 'g--') # 分割線 左邊是訓練數據 右邊是測試數據的輸出
plt.legend(loc='best')
plt.savefig('result.png', format='png', dpi=200)
plt.close()
浙公網安備 33010602011771號