import csv
import math
import os
import random
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from keras.layers.core import Dense, Dropout
from keras.layers.recurrent import LSTM
from keras.losses import mean_squared_error
from keras.models import Sequential
from sklearn.metrics import explained_variance_score
from sklearn.metrics import mean_absolute_error # MAE
from sklearn.metrics import mean_squared_error
from sklearn.metrics import r2_score # R2
from sklearn.preprocessing import MinMaxScaler
from tensorflow.python.keras.layers import Dense
from tensorflow.python.keras.layers import Dropout
from tensorflow.python.keras.layers import GRU
from tensorflow.python.keras.layers import LSTM
from tensorflow.python.keras.models import Sequential
from scipy.stats import norm
# batch_size = 128
# epochs = 400
os.chdir(r'D:\项目\PSO_GRU_LSTM')
filename = 'fuzai.csv'
steps = 12
def process_data(train, steps):
attr = 'NH'
df1 = pd.read_csv(train, encoding='gbk').fillna(0)
scaler = MinMaxScaler(feature_range=(0, 1)).fit(df1[attr].values.reshape(-1, 1))
flow1 = scaler.transform(df1[attr].values.reshape(-1, 1)).reshape(1, -1)[0]
# 数据划分
train, test = [], []
for i in range(steps, len(flow1)):
train.append(flow1[i - steps: i + 1])
train = np.array(train)
# size = int(len(train) * 0.80)
X_train = train[:2560, :-1]
y_train = train[:2560, -1]
X_test = train[2560:, :-1]
y_test = train[2560:, -1]
return X_train, y_train, X_test, y_test, scaler
def model_test_score(model, X_test, y_test):
y_hat = model.predict(X_test)
y_t = y_test.reshape(-1, 1)
temp = pd.DataFrame(y_hat)
temp['yhat'] = y_hat
temp['y'] = y_t
temp_rmse = np.sqrt(mean_squared_error(temp.y, temp.yhat))
temp_mse = mean_squared_error(temp.y, temp.yhat)
print('test RMSE: %.3f' % temp_rmse)
print('test MSE: %.3f' % temp_mse)
return temp_rmse, temp_mse
def writeOneCsv(relate_record, src):
with open(src, 'a', newline='\n') as csvFile:
writer = csv.writer(csvFile)
writer.writerow(relate_record)
file1 = 'fuzai.csv'
X_train, y_train, X_test, y_test, scaler = process_data(file1, steps)
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))
y_train = np.reshape(y_train, (y_train.shape[0], 1))
y_test = np.reshape(y_test, (y_test.shape[0], 1))
def build_model(neurons, dropout):
model = Sequential()
model.add(GRU(units=neurons, activation='relu', return_sequences=True, input_shape=(steps, 1)))
model.add(Dropout(dropout))
model.add(LSTM(units=neurons, activation='relu', return_sequences=True, input_shape=(steps, 1)))
model.add(Dropout(dropout))
model.add(LSTM(units=neurons, activation='relu', input_shape=(steps, 1)))
model.add(Dropout(dropout))
model.add(Dense(units=1, activation='relu'))
model.compile(loss='mean_squared_error', optimizer='adam') # 损失函数是均方差,优化器是采用adam
return model
result = 0
def training(X):
neurons = int(X[0])
dropout = round(X[1], 6)
batch_size = int(X[2])
epochs = int(X[3])
model = build_model(neurons, dropout)
model.fit(
X_train,
y_train,
batch_size=batch_size,
epochs=epochs,
validation_data=(X_test, y_test),
verbose=1)
_, result = model_test_score(model, X_test, y_test)
# print("ressa:"+str(result))
model.save(
'neurons' + str(int(X[0])) + '_dropout' + str(dropout) + '_batch_size' + str(batch_size) + 'epochs' + str(
epochs) + '.h5')
# 训练完成后可直接加载模型
# model_lstm = load_model('LSTM_bus_' + str(X[0]) + '_' + str(X[1]) + '_' + str(X[2]) + '_' + '.h5')
pred = model.predict(X_test)
le = len(pred)
y_t = y_test.reshape(-1, 1)
return pred, le, y_t, result
def function(ps, test, le):
ss = sum(((abs(test - ps)) / test) / le)
return ss
# (1) PSO Parameters
MAX_EPISODES = 20 # 最大迭代次数
MAX_EP_STEPS = 20
Wmax = 0.9 # 最大惯性权重
Wmin = 0.4 # 最小惯性权重
c1 = 2
c1start = 2.5
c1end = 0.5
c2 = 2
c2start = 0.5
c2end = 2.5
# a=0.5#正态分布的随机扰动项的标准差
# w =Wmax-(Wmax-Wmin)(n/n1)
pN = 9
# 粒子数量
# (2) LSTM Parameters
dim = 4 # 搜索维度
X = np.zeros((pN, dim)) # 所有粒子的位置和速度
V = np.zeros((pN, dim))
pbest = np.zeros((pN, dim)) # 个体经历的最佳位置和全局最佳位置
gbest = np.zeros(dim)
p_fit = np.zeros(pN) # 每个个体的历史最佳适应值
# print(p_fit.shape)
# print(p_fit.shape)
t1 = time.time()
'''
神经网络第一层神经元个数: 24-32
dropout比率: 0.03-0.19
batch_size: 64-128
'''
UP = [64, 0.19, 128, 600]
DOWN = [12, 0.03, 32, 100]
# (4) 开始搜索
for i_episode in range(MAX_EPISODES):
"""初始化s"""
random.seed(8)
fit = -1e5 # 全局最佳适应值
# 初始粒子适应度计算
print("计算初始全局最优")
for i in range(pN):
for j in range(dim):
V[i][j] = random.uniform(0, 1)
if j == 1:
X[i][j] = random.uniform(DOWN[j], UP[j])
else:
X[i][j] = round(random.randint(DOWN[j], UP[j]), 0, )
a = X[i][j]
pbest[i] = X[i]
le, pred, y_t, result = training(X[i])
NN = 1
tmp = function(pred, y_t, le)
p_fit[i] = tmp
if tmp > fit:
fit = tmp
gbest = X[i]
print("初始全局最优参数:{:}".format(gbest))
fitness = [] # 适应度函数
for j in range(MAX_EP_STEPS):
fit2 = []
plt.title("第{}次迭代".format(i_episode))
for i in range(pN):
le, pred, y_t, result = training(X[i])
temp = function(pred, y_t, le)
fit2.append(temp / 1000)
if temp > p_fit[i]: # 更新个体最优
p_fit[i] = temp
pbest[i] = X[i]
if p_fit[i] > fit: # 更新全局最优
gbest = X[i]
fit = p_fit[i]
print("搜索步数:{:}".format(j))
print("个体最优参数:{:}".format(pbest))
print("全局最优参数:{:}".format(gbest))
for i in range(pN):
'''if f<=favg:
w = int(Wmin + (Wmax - Wmin) * (f-fmin)/(favg-fmin))
else:
w=Wmax#动态权重'''
# a=np.random.normal(0.0,1.0,None)
a = 0.8
b = np.random.randn(1) * 0.1
print(b)
w = Wmin + (Wmax - Wmin) * (i_episode / MAX_EPISODES) + b
# w =Wmin+(Wmax-Wmin)*np.random.rand(1)+a*np.random.randn(1) #随机惯性权重
print(str(w) + "result:" + str(result))
if c1end < c1 < c1start:
c1 = c1start + (c1end - c1start) * (i_episode / MAX_EPISODES)
else:
c1 = (c1end + c1start) / 2
if c2start < c2 < c2end:
c2 = c2start + (c2end - c2start) * (i_episode / MAX_EPISODES)
else:
c2 = (c2start + c2end) / 2
# c1 = c1start + (c1end - c1start) *(i_episode / MAX_EPISODES)#异步学习因子
# c2= c2end + (c2end - c2start) * (i_episode / MAX_EPISODES)#异步学习因子
# c1=c1*(1-i_episode/MAX_EPISODES)#学习因子自适应调整
# c2=c2*(i_episode/MAX_EPISODES)#学习因子自适应调整
print(c1, c2)
V[i] = w * V[i] + c1 * random.uniform(0, 1) * (pbest[i] - X[i]) + c2 * random.uniform(0, 1) * (gbest - X[i])
ww = 1
for k in range(dim):
if DOWN[k] < X[i][k] + V[i][k] < UP[k]:
continue
else:
ww = 0
X[i] = X[i] + V[i] * ww
fitness.append(fit)
# 画适应度的图
plt.pl
- 1
- 2
- 3
- 4
前往页