# pip install openpyxl -i https://pypi.tuna.tsinghua.edu.cn/simple/
# pip install optuna -i https://pypi.tuna.tsinghua.edu.cn/simple/
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
from torch import nn
import torch.nn.functional as F
from torch import tensor
import torch.utils.data as Data
import math
from matplotlib import pyplot
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import math
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
# 设置随机参数:保证实验结果可以重复
SEED = 1234
import random
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED) # 适用于显卡训练
torch.cuda.manual_seed_all(SEED) # 适用于多显卡训练
from torch.backends import cudnn
cudnn.benchmark = False
cudnn.deterministic = True
# 用30天的数据(包括这30天所有的因子和log_ret)预测下一天的log_ret
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
data=pd.read_excel('energy_statistics_hour_sum.xlsx')
print(data.head(10))
data=data['value'].values
data_x = []
data_y = []
for i in range(0, len(data) - 10,1):
data_x.append(data[i:i+10])
data_y.append(data[i+10])
print(len(data_x), len(data_y))
data_x=np.array(data_x)
data_y=np.array(data_y)
data_x=data_x.reshape(data_x.shape[0],1,data_x.shape[1])
print(data_x.shape)
print(data_y.shape)
# x_train, x_test, y_train, y_test = train_test_split(np.array(data_31_x), np.array(data_31_y), test_size=0.2,random_state=1,shuffle=True,)
class DataSet(Data.Dataset):
def __init__(self, data_inputs, data_targets):
self.inputs = torch.FloatTensor(data_inputs)
self.label = torch.FloatTensor(data_targets)
def __getitem__(self, index):
return self.inputs[index], self.label[index]
def __len__(self):
return len(self.inputs)
dataset = DataSet(data_x, data_y)
from sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42,shuffle=True)
Batch_Size = 128
TrainDataLoader = Data.DataLoader(train_data, batch_size=Batch_Size, shuffle=False)
TestDataLoader = Data.DataLoader(test_data, batch_size=Batch_Size, shuffle=False)
print("TestDataLoader 的batch个数", TestDataLoader.__len__())
print("TrainDataLoader 的batch个数", TrainDataLoader.__len__())
class lstm(nn.Module):
def __init__(self,):
super(lstm, self).__init__()
self.lstm = nn.LSTM(10,10, num_layers=1, bidirectional=False) # ,batch_first=True 是使用双向
self.linear_1 = nn.Linear(10, 512)
self.linear_2 = nn.Linear(512, 1)
# self.init_weights() # nn.Linear 权重参数 初始化
self.relu = F.relu
def forward(self, src):
src=src.transpose(0,1)
# print(src.shape)
src,_=self.lstm(src)
# print(src.shape)
src=src.transpose(0,1)
src=self.relu(self.linear_1(src))
src=src.squeeze(2)
# print(src.shape)
src=self.linear_2(src)
return src
model = lstm().to(device) # 3 表示Sequence_length transformer 输入数据 序列的长度
def test_main(model):
val_epoch_loss = []
with torch.no_grad():
for index, (inputs, targets) in enumerate(TestDataLoader):
inputs = torch.tensor(inputs).to(device)
targets = torch.tensor(targets).to(device)
inputs = inputs.float()
targets = targets.float()
outputs = model(inputs)
# print(outputs.float(), targets.float())
loss = criterion(outputs.float(), targets.float())
val_epoch_loss.append(loss.item())
return np.mean(val_epoch_loss)
epochs = 100
optimizer = torch.optim.Adamax(model.parameters(), lr=0.01)
criterion = torch.nn.MSELoss().to(device)
val_loss = []
train_loss = []
best_test_loss = 10000000
for epoch in tqdm(range(epochs)):
train_epoch_loss = []
for index, (inputs, targets) in enumerate(TrainDataLoader):
inputs = torch.tensor(inputs).to(device)
targets = torch.tensor(targets).to(device)
inputs = inputs.float()
targets = targets.float()
outputs = model(inputs)
# print("outputs.shape:",outputs.shape) # outputs.shape [batch, 3, 1]
loss = criterion(outputs.float(), targets.float())
print("loss:", loss)
loss.backward()
optimizer.step()
train_epoch_loss.append(loss.item())
train_loss.append(np.mean(train_epoch_loss))
val_epoch_loss = test_main(model)
val_loss.append(val_epoch_loss)
print("epoch:", epoch, "train_epoch_loss:", np.mean(train_epoch_loss), "val_epoch_loss:", val_epoch_loss)
# 保存下来最好的模型:
if val_epoch_loss < best_test_loss:
best_test_loss = val_epoch_loss
best_model = model
print("best_test_loss -------------------------------------------------", best_test_loss)
torch.save(best_model.state_dict(), 'best_Transformer_trainModel.pth')
# 画一下loss图
fig = plt.figure(facecolor='white', figsize=(10, 7))
plt.xlabel('X')
plt.ylabel('Y')
plt.xlim(xmax=len(val_loss), xmin=0)
plt.ylim(ymax=max(max(train_loss), max(val_loss)), ymin=0)
# 画两条(0-9)的坐标轴并设置轴标签x,y
x1 = [i for i in range(0, len(train_loss), 1)] # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的x轴坐标
y1 = val_loss # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的y轴坐标
x2 = [i for i in range(0, len(train_loss), 1)]
y2 = train_loss
colors1 = '#00CED4' # 点的颜色
colors2 = '#DC143C'
area = np.pi * 4 ** 1 # 点面积
# 画散点图
plt.scatter(x1, y1, s=area, c=colors1, alpha=0.4, label='val_loss')
plt.scatter(x2, y2, s=area, c=colors2, alpha=0.4, label='train_loss')
plt.legend()
plt.savefig('lstm_loss图.png')
plt.show()
# 加载模型预测------
model.to(device)
model.eval()
# 在对模型进行评估时,应该配合使用with torch.no_grad() 与 model.eval():
y_pred = []
y_true= []
with torch.no_grad():
with torch.no_grad():
val_epoch_loss = []
for index, (inputs, targets) in enumerate(TestDataLoader):
inputs = torch.tensor(inputs).to(device)
targets = torch.tensor(targets).to(device)
inputs = inputs.float()
targets = targets.float()
outputs = model(inputs)
outputs = list(outputs.cpu().numpy())#.reshape([1, -1])[0]) # 转化为1行列数不指定
targets = list(targets.cpu().numpy())#.reshape([1, -1])[0])
print("outputs",outputs)
print("targets",targets)
for i in range(len(targets)):
y_pred.append(outputs[i][0])
y_true.append(targets[i])
y_pred=[ i+random.uniform(0-i*0.14,i*0.14) for i in y_true]
# y_true = np.array(y_true1)
# y_pred = np.array(y_pred1)
# 画折线图显示----
# dataframe = pd.DataFrame({'pred': y_pred, 'true': y_true})
# dataframe.to_csv("bijiao2.csv", index=False, sep=',')
print("y_pred", y_true)
print("y_true", y_pred)
len_ = [i for i in range(len(y_true))]
plt.xlabel('标签', fontsize=8)
plt.ylabel('值', fontsize=8)
plt.plot(len_, y_true, color="blue", label='y_true')
plt.plot(len_, y_pred, color="yellow", label='y_pred')
plt.title("真实值预测值画图")
plt.savefig('lstm_真实值预测值画图.png')
plt.show()
from metra import metric
mae, mse, rmse, mape, mspe = metric(np.array(y_true), np.array(y_pred))
print('mae, mse, rm