import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.svm import SVC,SVR
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
from data_process import date_pro
from sklearn.model_selection import train_test_split
# 设置随机参数:保证实验结果可以重复
SEED = 1234
import random
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED) # 适用于显卡训练
torch.cuda.manual_seed_all(SEED) # 适用于多显卡训练
from torch.backends import cudnn
cudnn.benchmark = False
cudnn.deterministic = True
# 用30天的数据(包括这30天所有的因子和log_ret)预测下一天的log_ret
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
c1_train_x, c1_target = date_pro("c1/c1/", 'c1_wear_label.csv', 'score',end_id=200)
# print(c1_train_x.shape)
# print(c1_target.shape)
c4_train_x, c4_target = date_pro("c4/c4/", 'c4_wear_label.csv', 'score',end_id=200)
# print(c4_train_x.shape)
# print(c4_target.shape)
c6_train_x, c6_target = date_pro("c6/c6/", 'c6_wear_label.csv', 'score',end_id=200)
# print(c6_train_x.shape)
# print(c6_target.shape)
train_x = np.append(c1_train_x,c4_train_x,axis=0)
train_x=np.append(train_x,c6_train_x,axis=0)
train_x=train_x.reshape(train_x.shape[0],-1)
target=list(c1_target)+list(c4_target)+list(c6_target)
print(train_x.shape)
print(len(target))
num_features=train_x.shape[1]
# 随机划分数据集
X_train, X_test, y_train, y_test = train_test_split(np.array(train_x), np.array(target), test_size=0.2, random_state=42,shuffle=False)
# 转换为Tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32)
# 数据加载器
batch_size = 8
train_dataset = TensorDataset(X_train, y_train)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 2. 模型定义
# CNN特征提取
class SimpleCNN(nn.Module):
def __init__(self, input_dim, output_dim):
super(SimpleCNN, self).__init__()
self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
self.pool = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
self.fc1 = nn.Linear(16 * (input_dim // 2), output_dim)
def forward(self, x):
x = x.unsqueeze(1) # 增加通道维度
x = F.relu(self.conv1(x))
x = self.pool(x)
x = x.view(-1, 16 * (x.size(2)))
x = self.fc1(x)
return x
# RIME优化
class RIME(nn.Module):
def __init__(self, input_dim, embedding_dim):
super(RIME, self).__init__()
self.fc = nn.Linear(input_dim, embedding_dim)
self.dropout = nn.Dropout(p=0.5)
def forward(self, x):
x = self.fc(x)
x = F.relu(x)
x = self.dropout(x)
return x
# SVM分类器
class SVMClassifier:
def __init__(self):
self.scaler = StandardScaler()
self.svm = SVR()
def fit(self, X, y):
X = self.scaler.fit_transform(X)
self.svm.fit(X, y)
def predict(self, X):
X = self.scaler.transform(X)
return self.svm.predict(X)
# 3. 训练模型
# 训练CNN
def train_cnn(cnn_model, train_loader, num_epochs=50):
criterion = nn.MSELoss()
optimizer = optim.Adam(cnn_model.parameters(), lr=0.001)
cnn_model.train()
for epoch in range(num_epochs):
running_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = cnn_model(inputs)
# print(outputs.shape,labels.shape)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
epoch_loss = running_loss / len(train_loader.dataset)
print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss}')
cnn_model = SimpleCNN(input_dim=num_features, output_dim=1)
rime_model = RIME(input_dim=1, embedding_dim=300)
train_cnn(cnn_model, train_loader)
# 提取CNN特征
def extract_features(cnn_model, data_loader):
cnn_model.eval()
all_features = []
with torch.no_grad():
for inputs, _ in data_loader:
features = cnn_model(inputs)
all_features.append(features)
return torch.cat(all_features, dim=0)
X_train_features = extract_features(cnn_model, train_loader)
X_test_features = extract_features(cnn_model, test_loader)
# 使用RIME进行特征优化
def optimize_features(rime_model, features):
rime_model.eval()
with torch.no_grad():
optimized_features = rime_model(features)
return optimized_features
X_train_optimized = optimize_features(rime_model, X_train_features)
X_test_optimized = optimize_features(rime_model, X_test_features)
# 训练SVM
svm_classifier = SVMClassifier()
svm_classifier.fit(X_train_optimized.numpy(), y_train.numpy())
# 4. 预测和评估
# 预测
y_pred = svm_classifier.predict(X_test_optimized.numpy())
# 评估
from metra import metric
mae, mse, rmse, mape, mspe,true,pred = metric(np.array(y_test), np.array(y_pred))
print('mae, mse, rmse, mape, mspe')
print(mae, mse, rmse, mape, mspe)
y_test_np = true
y_test_pred_np = pred
# 计算误差绝对值
errors = np.abs(y_test_np - y_test_pred_np)
# 绘图
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 8))
# 画出期望输出
plt.plot(y_test_np, label='真实值', color='blue', linestyle='-', marker='o', markersize=3)
# 画出预测输出
plt.plot(y_test_pred_np, label='预测值', color='red', linestyle='-', marker='x', markersize=3)
# 画出误差绝对值
plt.plot(errors, label='误差', color='green', linestyle='--', marker='s', markersize=3)
plt.xlabel('样本')
plt.ylabel('函数输出')
plt.title('RIME-CNN-SVM 网络输出预测结果')
plt.legend()
plt.grid(True)
plt.show()
一枚爱吃大蒜的程序员
- 粉丝: 1w+
- 资源: 99
最新资源
- 数据来源于Kaggle,文件名为 cwurData.csv -预测大学得分
- 【java毕业设计】大学生户外运动管理系统源码(完整前后端+说明文档+LW).zip
- 编译原理-LR(1)语法分析器-C语言实验
- Web前端大作业 在线电影主题网站10页 HTML+CSS 带设计说明报告
- Web应用渗透测试信息收集器.zip
- 操作系统-模拟进程调度(时间片轮转调度算法,高优先级调度算法)C语言实现-实验报告
- C#ASP.NET教务系统框架源码 ASP.NET Extjs框架源码数据库 SQL2008源码类型 WebForm
- 操作系统-模拟进程的调度(时间片轮转算法,高优先级调度算法)C实现
- WEB 渗透测试.zip
- Web 渗透工具集.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈