# -*- coding: utf-8 -*-
# @Time : 2020/12/3 13:39
# @Author :
import pandas as pd
import torch
import torch.nn as nn
from sklearn.metrics import mean_squared_error
from torch import optim
import torch.nn.functional as F
import time
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import random
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
# writer = SummaryWriter()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'device is {device}')
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_dim, n_layers, drop_prob=0):
super(EncoderRNN, self).__init__()
self.hidden_dim = hidden_size
self.n_layers = n_layers
self.gru = nn.GRU(input_size, hidden_size, n_layers, batch_first=True, dropout=drop_prob)
self.fc = nn.Linear(hidden_size, output_dim)
self.relu = nn.ReLU()
def forward(self, input, hidden=None):
if hidden == None:
output, hidden = self.gru(input)
else:
output, hidden = self.gru(input, hidden)
# output = self.fc(self.relu(output[:, -1]))
output = self.fc(output[:, -1])
return output, hidden
def init_hidden(self, batch_size):
# weight = next(self.parameters()).data
hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim, device=device)
return hidden
class DecoderRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers, drop_prob=0):
super(DecoderRNN, self).__init__()
self.hidden_size = hidden_size
self.n_layers = n_layers
self.gru = nn.GRU(input_size, hidden_size, n_layers, batch_first=True, dropout=drop_prob)
self.out = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()
def forward(self, input, hidden):
output, hidden = self.gru(input, hidden)
# output = self.out(self.relu(output[:, -1]))
output = self.out(output[:, -1])
return output, hidden
def init_hidden(self, batch_size):
# weight = next(self.parameters()).data
# hidden = weight.new(self.n_layers, batch_size, self.hidden_size).zero().to(device)
hidden = torch.zeros(self.n_layers, batch_size, self.hidden_size, device=device)
return hidden
class AttnDecoderRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size, input_length, dropout_p=0.1):
super(AttnDecoderRNN, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.dropout_p = dropout_p
self.attn = nn.Linear(self.hidden_size + input_size, input_length)
self.attn_combine = nn.Linear(self.hidden_size + input_size, self.hidden_size)
self.dropout = nn.Dropout(self.dropout_p)
self.gru = nn.GRU(self.hidden_size, self.hidden_size, batch_first=True)
self.out = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input, hidden, encoder_outputs):
hidden = hidden.view(-1, 1, self.hidden_size)
attn_weights = F.softmax(
self.attn(torch.cat((input[:, 0], hidden[:, 0]), 1)), dim=1)
attn_applied = torch.bmm(attn_weights.unsqueeze(1),
encoder_outputs)
output = torch.cat((input[:, 0], attn_applied[:, 0]), 1)
# print(output.size())
output = self.attn_combine(output).unsqueeze(1)
# print(output.size())
output = F.relu(output)
hidden = hidden.view(1, -1, self.hidden_size)
output, hidden = self.gru(output, hidden)
output = self.out(output[:, -1])
return output, hidden, attn_weights
def init_hidden(self, batch_size):
weight = next(self.parameters()).data
hidden = weight.new(self.n_layers, batch_size, self.hidden_size).zero_().to(device)
return hidden
def scale(X: np.ndarray, y: np.ndarray):
x_sc = MinMaxScaler()
x_train_scaled = x_sc.fit_transform(X)
y_sc = MinMaxScaler()
y_train_scaled = y_sc.fit_transform(y.reshape(-1, 1))
return x_train_scaled, y_train_scaled, x_sc, y_sc
def lookback(x, y, days, period=24):
# define lookback period
inputs = np.zeros((int(x.shape[0] / period) - days, period * (days - 1), x.shape[1]))
labels = np.zeros((int(x.shape[0] / period) - days, period * 2))
decoder_inputs = np.zeros((int(x.shape[0] / period) - days, period * 2, x.shape[1]))
# print(inputs.shape, labels.shape)
for i in range(period * days, x.shape[0] - period, period):
"""顺移
0---0:90;
1---1:91;......
"""
# print(i, period, (i - period * 2) / 96, y[i:i + period].shape)
inputs[int((i - period * days) / period)] = x[i - period * days:i - period]
labels[int((i - period * days) / period)] = y[i - period:i + period].reshape(-1, )
decoder_inputs[int((i - period * days) / period)] = x[i - period:i + period]
# print(i)
inputs = inputs.reshape(-1, period * (days - 1), x.shape[1])
labels = labels.reshape(-1, period * 2)
decoder_inputs = decoder_inputs.reshape(-1, period * 2, x.shape[1])
# print(x.shape, y.shape)
# print(inputs.shape, labels.shape)
return inputs, decoder_inputs, labels
def RNNTrain(train_loader, learning_rate=0.001, hidden_dim=256, EPOCHS=10):
input_dim = next(iter(train_loader))[0].shape[2]
output_dim = 1
encoder = EncoderRNN(input_dim, hidden_dim, output_dim, n_layers).to(device)
decoder = DecoderRNN(input_dim, hidden_dim, output_dim, n_layers, drop_prob=0).to(device)
print('starting training of {} model'.format('RNN'))
encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=learning_rate)
criterion = nn.MSELoss()
encoder.train()
decoder.train()
epoch_times = []
for epoch in range(1, EPOCHS + 1):
start_time = time.process_time()
avg_loss = 0
counter = 0
# encoder_hidden = encoder.init_hidden(batch_size)
for input_tensor, decoder_inputs, target_tensor in train_loader:
input_tensor = input_tensor.to(device).float()
decoder_inputs = decoder_inputs.to(device).float()
target_tensor = target_tensor.to(device).float()
# encoder_hidden = encoder_hidden.data
encoder.zero_grad()
decoder.zero_grad()
loss = 0
counter += 1
input_length = input_tensor.size(1)
target_length = target_tensor.size(1)
for ei in range(input_length):
if ei == 0:
encoder_output, encoder_hidden = encoder(input_tensor[:, ei, :].unsqueeze(1))
else:
encoder_output, encoder_hidden = encoder(input_tensor[:, ei, :].unsqueeze(1), encoder_hidden)
# encoder_output, encoder_hidden = encoder(input_tensor)
# print(f'input_tensor,{input_tensor},\n encoder_hidden,{encoder_hidden}')
decoder_hidden = encoder_hidden
use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
# with torch.autograd.set_detect_anomaly(True):
if use_teacher_forcing:
# teacher forcing:feed the target as the next input
for di in range(target_length):
decoder_input = decoder_inputs[:, di, :].unsqueeze(1).data # teacher forcing
# print(f'teacher forcing {decoder_input,decoder_hidden}')
decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
# pri
- 1
- 2
前往页