import pandas as pd
import matplotlib.pyplot as plt
import torch.nn as nn
import torch
import time
import numpy as np
import random
data = pd.read_csv("负荷-3变量.csv")
# data.plot()
# plt.show()
# 输入3个变量,预测3个变量,搭建3个连接层,使用3个损失函数,再将其进行相加
# 1 制作数据集,做成[Batch_size,时间长度,每个时间点的特征为3]
# 预测未来48小时
data = data.iloc[:, [1, 2, 3]].values # 获取数值 456 *3
p = 48 # 预测未来48个小时的数据
train_data = data[:-p]
test_data = data[-p:]
min_data = np.min(train_data, axis=0)
max_data = np.max(train_data, axis=0)
train_data_scaler = (train_data - min_data) / (max_data - min_data)
def get_x_y(data, step=12):
# x : 12*3
# y: 1 *3
x_y = []
for i in range(len(data) - step):
x = data[i:i + step, :]
y = np.expand_dims(data[i + step, :], 0)
x_y.append([x, y])
return x_y
def get_mini_batch(data, batch_size=16):
# x: 16 * 12 *3
# y :16 * 1 *3
for i in range(0, len(data) - batch_size, batch_size):
samples = data[i:i + batch_size]
x, y = [], []
for sample in samples:
x.append(sample[0])
y.append(sample[1])
yield np.asarray(x), np.asarray(y)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class LSTM(nn.Module): # 注意Module首字母需要大写
def __init__(self, hidden_size, num_layers, output_size, batch_size):
super().__init__()
self.hidden_size = hidden_size # 隐含层神经元数目 100
self.num_layers = num_layers # 层数 通常设置为2
self.output_size = output_size # 48 一次预测下48个时间步
self.num_directions = 1
self.input_size = 3
self.batch_size = batch_size
# 初始化隐藏层数据
self.hidden_cell = (
torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device),
torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size).to(device))
self.lstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True).to(device)
self.fc1 = nn.Linear(self.hidden_size, self.output_size).to(device)
self.fc2 = nn.Linear(self.hidden_size, self.output_size).to(device)
self.fc3 = nn.Linear(self.hidden_size, self.output_size).to(device)
def forward(self, input):
output, _ = self.lstm(torch.FloatTensor(input).to(device), self.hidden_cell)
pred1, pred2, pred3 = self.fc1(output), self.fc2(output), self.fc3(output)
pred1, pred2, pred3 = pred1[:, -1, :], pred2[:, -1, :], pred3[:, -1, :]
pred = torch.stack([pred1, pred2, pred3], dim=2)
return pred
if __name__ == '__main__':
time_step = 12 # 用过去12个小时,预测未来第13个小时
train_x_y = get_x_y(train_data_scaler, step=time_step)
random.shuffle(train_x_y)
batch_size = 24
hidden_size = 100
num_layers = 2
output_size = 1
model = LSTM(hidden_size, num_layers, output_size, batch_size).to(device)
loss_function = nn.MSELoss(reduce=True, size_average=True).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 建立优化器实例
print(model)
epochs = 200
loss_list = []
for i in range(epochs):
start = time.time()
loss_all = 0
num = 0
for seq_batch, label_batch in get_mini_batch(train_x_y, batch_size):
optimizer.zero_grad()
y_pred = model(seq_batch)
loss = 0
for j in range(3):
loss += loss_function(y_pred[:,:, j], torch.FloatTensor(label_batch[:, :, j]).to(device))
loss /= 3
loss.backward() # 调用loss.backward()自动生成梯度,
optimizer.step() # 使用optimizer.step()执行优化器,把梯度传播回每个网络
loss_all += loss.item()
num += 1
# 查看模型训练的结果
loss_all /= num
loss_list.append(loss_all)
print(f'epoch:{i:3} loss:{loss_all:10.8f} time:{time.time() - start:6}')
plt.plot(list(range(len(loss_list))), loss_list, '-')
plt.show()
model.eval()
with torch.no_grad():
model.hidden_cell = (torch.zeros(1 * num_layers, 1, hidden_size).to(device),
torch.zeros(1 * num_layers, 1, hidden_size).to(device))
# 测试集
total_test_loss = 0
test_pred = np.zeros(shape=(p, 3))
for i in range(len(test_data)):
x = train_data_scaler[-time_step:, :]
x1 = np.expand_dims(x, 0)
test_y_pred_scalar = np.expand_dims(model(x1).cpu().squeeze().numpy(), 0) # 预测的值0-1
train_data_scaler = np.append(train_data_scaler, test_y_pred_scalar, axis=0)
y = test_y_pred_scalar * (max_data - min_data) + min_data
test_pred[i,:] = y
x_in = list(range(len(test_pred)))
plt.plot(x_in, test_data[:, 0], 'ro-')
plt.plot(x_in, test_data[:, 1], 'r*-')
plt.plot(x_in, test_data[:, 2], 'r.-')
plt.plot(x_in, test_pred[:, 0], 'bo-')
plt.plot(x_in, test_pred[:, 1], 'b*-')
plt.plot(x_in, test_pred[:, 2], 'b.-')
plt.legend(["true1", "true2", "true3", "pred1", "pred2", "pred3"])
plt.show()
评论0