import pandas as pd
import matplotlib.pyplot as plt
from pandas import read_excel
import numpy as np
from pandas import DataFrame
from pandas import concat
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM,Dense
from numpy import concatenate
from sklearn.metrics import mean_squared_error
from math import sqrt
from matplotlib.font_manager import FontProperties # 字体管理器
import os
os.environ["PATH"] += os.pathsep + 'C:/Program Files (x86)/Graphviz2.38/bin/'
from keras.utils import plot_model
# 设置汉字格式
font = FontProperties(fname=r"c:\windows\fonts\simsun.ttc", size=15)
def str_to_float(s):
s=s[:-1]
s_float=float(s)
return s_float
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
"""
Frame a time series as a supervised learning dataset.
Arguments:
data: Sequence of observations as a list or NumPy array.
n_in: Number of lag observations as input (X).
n_out: Number of observations as output (y).
dropnan: Boolean whether or not to drop rows with NaN values.
Returns:
Pandas DataFrame of series framed for supervised learning.
"""
n_vars = 1 if type(data) is list else data.shape[1]
df = DataFrame(data)
cols, names = list(), list()
# input sequence (t-n, ... t-1)
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
# forecast sequence (t, t+1, ... t+n)
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
# put it all together
agg = concat(cols, axis=1)
agg.columns = names
# drop rows with NaN values
if dropnan:
agg.dropna(inplace=True)
return agg
dataset1=read_excel('stock_data/603993.SH.xlsx',index_col=0)
dataset1.index.name='日期'
dataset1.drop('交易日期', axis=1, inplace=True)
dataset1['换手率']=dataset1['换手率'].apply(str_to_float)
dataset1['日收益率']=dataset1['收盘价'].pct_change()
#dataset1=dataset1['日收益率'].dropna()
#testdata1=dataset1.loc['2016-02-15':'2017-12-29']
#traindata1=pd.concat([dataset1.loc['2013-01-04':'2016-02-15'],dataset1.loc['2017-12-29':'2019-3-14']],axis=0)
dataset1['日收益率'].fillna(method='bfill',inplace=True)
dataset1['pb'].fillna(method='ffill',inplace=True)
values1=dataset1.values
values1=values1.astype('float32')
# normalize features
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values1)
# frame as supervised learning
reframed = series_to_supervised(scaled, 1, 1)
# drop columns we don't want to predict
reframed.drop(reframed.columns[[10, 11, 12, 13, 14, 15,16,17,18]], axis=1, inplace=True)
# split into train and test sets
values1 = reframed.values
train = np.concatenate([values1[:774, :],values1[1219:,:]])
test = values1[774:1219, :]
# split into input and outputs
train_X, train_y = train[:, :-1], train[:, -1]
test_X, test_y = test[:, :-1], test[:, -1]
# reshape input to be 3D [samples, timesteps, features]
train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
# design network
model = Sequential()
model.add(LSTM(50, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='adam')
# fit network
history = model.fit(train_X, train_y, epochs=50, batch_size=72, validation_data=(test_X, test_y), verbose=2,shuffle=False)
# plot history
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='test')
plt.title('LSTM_603993.SH', fontsize='12',fontproperties=font)
plt.ylabel('模型损失', fontsize='10',fontproperties=font)
plt.xlabel('模型迭代次数', fontsize='10',fontproperties=font)
plt.legend()
plt.savefig('figures/loss_20.png')
plt.show()
#plt.ylim(0,8)
#plt.xticks(np.arange(0,15,2))
#plt.legend(loc='best',fontsize='small')
#plt.savefig('figures/price1.png') #将图片进行保存
#plt.show()
#plot_model(model,to_file='figures/model.png',show_shapes=True,show_layer_names=False)
# make a prediction
yhat = model.predict(test_X)
test_X = test_X.reshape((test_X.shape[0], test_X.shape[2]))
# invert scaling for forecast
inv_yhat = concatenate((yhat, test_X[:, 1:]), axis=1)
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat = inv_yhat[:, 0]
# invert scaling for actual
test_y = test_y.reshape((len(test_y), 1))
inv_y = concatenate((test_y, test_X[:, 1:]), axis=1)
inv_y = scaler.inverse_transform(inv_y)
inv_y = inv_y[:, 0]
# calculate RMSE
rmse = sqrt(mean_squared_error(inv_y, inv_yhat))
print('Test RMSE: %.3f' % rmse)
- 1
- 2
- 3
- 4
- 5
- 6
前往页