# pip install openpyxl -i https://pypi.tuna.tsinghua.edu.cn/simple/
# pip install optuna -i https://pypi.tuna.tsinghua.edu.cn/simple/
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
from torch import nn
import torch.nn.functional as F
from torch import tensor
import torch.utils.data as Data
import math
from matplotlib import pyplot
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
import math
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
from read_data import date_pro
# 设置随机参数:保证实验结果可以重复
SEED = 1234
import random
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED) # 适用于显卡训练
torch.cuda.manual_seed_all(SEED) # 适用于多显卡训练
from torch.backends import cudnn
cudnn.benchmark = False
cudnn.deterministic = True
# 用30天的数据(包括这30天所有的因子和log_ret)预测下一天的log_ret
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_x, target = date_pro()
print(train_x.shape)
print(target.shape)
# (118, 4, 7)
# (118,)
class DataSet(Data.Dataset):
def __init__(self, data_inputs, data_targets):
self.inputs = torch.FloatTensor(data_inputs)
self.label = torch.FloatTensor(data_targets)
def __getitem__(self, index):
return self.inputs[index], self.label[index]
def __len__(self):
return len(self.inputs)
dataset = DataSet(train_x, target)
# Split the data into training and testing sets and create data loaders
from sklearn.model_selection import train_test_split
train_data, test_data = train_test_split(dataset, test_size=0.2, random_state=42, shuffle=False)
batch_size = 32
TrainDataLoader = Data.DataLoader(train_data, batch_size=batch_size, shuffle=False)
TestDataLoader = Data.DataLoader(test_data, batch_size=batch_size, shuffle=False)
print("TestDataLoader 的batch个数", TestDataLoader.__len__())
print("TrainDataLoader 的batch个数", TrainDataLoader.__len__())
class ChannelAttention(nn.Module):
def __init__(self, channels, reduction=1):
super(ChannelAttention, self).__init__()
self.avg_pool = nn.AdaptiveAvgPool2d(1)
self.fc1 = nn.Conv2d(channels, channels // reduction, kernel_size=1, stride=1, padding=0)
self.relu = nn.ReLU(inplace=True)
self.fc2 = nn.Conv2d(channels // reduction, channels, kernel_size=1, stride=1, padding=0)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
out = self.avg_pool(x)
out = self.fc1(out)
out = self.relu(out)
out = self.fc2(out)
out = self.sigmoid(out)
out = x * out
return out
class SpatialAttention(nn.Module):
def __init__(self):
super(SpatialAttention, self).__init__()
self.conv = nn.Conv2d(2, 1, kernel_size=7, stride=1, padding=3)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
avg_out = torch.mean(x, dim=1, keepdim=True)
max_out, _ = torch.max(x, dim=1, keepdim=True)
out = torch.cat([avg_out, max_out], dim=1)
out = self.conv(out)
out = self.sigmoid(out)
out = x * out
return out
class cnn_cbam_lstm(nn.Module):
def __init__(self, channels=1, reduction=1):
super(cnn_cbam_lstm, self).__init__()
self.channel_att = ChannelAttention(channels, reduction)
self.spatial_att = SpatialAttention()
self.conv = nn.Conv2d(1, 1, kernel_size=(1,1), stride=1, padding=1)
self.lstm = nn.LSTM(8, 8, num_layers=1, bidirectional=False) # ,batch_first=True 是使用双向 seq_len,batch,input_size
self.liner1=nn.Linear(8,1)
self.liner2=nn.Linear(5,1)
self.relu=F.relu
def forward(self, x):
x=x.unsqueeze(1)
# print(x.shape)
out=self.conv(x)
# print(out.shape)
out = self.channel_att(out)
out = self.spatial_att(out)
# print("124",out.shape)
out=out.squeeze(1)
# print("126",out.shape)
out=out.transpose(0,1)
out,_=self.lstm(out)
out=out.transpose(0,1)
# print('out.shape',out.shape)
out=self.liner2(self.relu(self.liner1(out).squeeze(2)))
return out
from metra import metric
# 这个函数是测试用来测试x_test y_test 数据 函数
def eval_test(model): # 返回的是这10个 测试数据的平均loss
test_epoch_loss = []
pre_list = []
test_list = []
with torch.no_grad():
optimizer.zero_grad()
for step, (test_x, test_y) in enumerate(TestDataLoader):
test_x = test_x.to(device)
test_y = test_y.to(device)
y_pre = model(test_x)
for i in y_pre:
for j in i:
pre_list.append(j.item())
for i in test_y:
test_list.append(i)
test_loss = loss_function(y_pre, test_y.long())
test_epoch_loss.append(test_loss.item())
print(len(pre_list),len(test_list))
print(pre_list,test_list)
mae, mse, rmse, mape, mspe, R2 = metric(np.array(pre_list), np.array(test_list))
return np.mean(test_epoch_loss), mae, mse, rmse, mape, mspe, R2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
epochs = 100 # 20-50
model = cnn_cbam_lstm().to(device)
loss_function = torch.nn.MSELoss().to(device) # 损失函数的计算 交叉熵损失函数计算
optimizer = torch.optim.Adagrad(model.parameters(), lr=0.01)
print(model)
sum_train_epoch_loss = [] # 存储每个epoch 下 训练train数据的loss
sum_test_epoch_loss = [] # 存储每个epoch 下 测试 test数据的loss
best_test_loss = 1000000000000
r2_list = []
sum_mtrie = []
for epoch in range(epochs):
epoch_loss = []
for step, (train_x, train_y) in enumerate(TrainDataLoader):
train_x = train_x.to(device)
train_y = train_y.to(device)
y_pred = model(train_x)
single_loss = loss_function(y_pred, train_y)
# print(single_loss)
single_loss.backward() # 调用backward()自动生成梯度
optimizer.step() # 使用optimizer.step()执行优化器,把梯度传播回每个网络
epoch_loss.append(single_loss.item())
train_epoch_loss = np.mean(epoch_loss)
test_epoch_loss, mae, mse, rmse, mape, mspe, R2 = eval_test(model) # 测试数据的平均loss
sum_mtrie.append([mae, mse, rmse, mape, mspe, R2, test_epoch_loss])
r2_list.append(mse)
print(test_epoch_loss, best_test_loss)
if test_epoch_loss < best_test_loss:
best_test_loss = test_epoch_loss
print("best_test_loss", best_test_loss)
best_model = model
sum_train_epoch_loss.append(train_epoch_loss)
sum_test_epoch_loss.append(test_epoch_loss)
print("epoch:" + str(epoch) + " train_epoch_loss: " + str(train_epoch_loss) + " test_epoch_loss: " + str(
test_epoch_loss))
mixed_df = pd.DataFrame(sum_mtrie, columns=['mae', 'mse', 'rmse', 'mape', 'mspe', 'R2', 'test_epoch_loss'])
mixed_df.to_excel(r'sum_mtrie.xlsx')
torch.save(best_model, 'best_model23.pth')
# 出图
x1 = [i for i in range(0, len(r2_list), 1)] # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的x轴坐标
y1 = r2_list # 随机产生300个平均值为2,方差为1.2的浮点数,即第一簇点的y轴坐标
colors1 = '#00CED4' # 点的颜色
colors2 = '#DC143C'
area = np.pi * 4 ** 1 # 点面积
# 画散点图
plt.scatter(x1, y1, s=area, c=colors1, alpha=0.4, label='r2')
#