# -*- coding: utf-8 -*-
"""
Created on Tue Oct 22 00:54:18 2019
@author: 潮洛蒙
"""
from scipy.io import loadmat
import keras
from keras.layers import Dense, Conv1D, BatchNormalization, MaxPooling1D, Activation, Flatten, Dropout, LSTM
from keras.optimizers import Adam,SGD
from keras.models import Sequential
from keras.utils import plot_model
from keras.regularizers import l2
from keras.callbacks import EarlyStopping
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve,auc
from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import random
import numpy as np
import time
# 训练参数
batch_size = 128
epochs =500
num_classes = 10
proportion=0.10#所用训练集的比例
BatchNorm = True # 是否批量归一化
normal = True # 是否标准化
C0= loadmat('dataset/0.mat')
C1= loadmat('dataset/1.mat')
C2= loadmat('dataset/2.mat')
C3 = loadmat('dataset/3.mat')
C4 = loadmat('dataset/4.mat')
C5= loadmat('dataset/5.mat')
C6 = loadmat('dataset/6.mat')
C7 = loadmat('dataset/7.mat')
C8 = loadmat('dataset/8.mat')
C9 = loadmat('dataset/9.mat')
#将数据处理成三列长条,三列中每个元素是截取的一段
train_DE = list(range(10))
test_DE = list(range(10))
for i in range(0,10):
name=eval('C'+'%s'%i)
train_DE[i]=name['train_DE_time']
test_DE[i]=name['test_DE_time']
train_FE = list(range(10))
test_FE = list(range(10))
for i in range(0,10):
name=eval('C'+'%s'%i)
train_FE[i]=name['train_FE_time']
test_FE[i]=name['test_FE_time']
train_BA = list(range(10))
test_BA = list(range(10))
train_BA[0]=np.zeros(shape=(100000,1))
test_BA[0]=np.zeros(shape=(20000,1))
for i in range(1,10):
name=eval('C'+'%s'%i)
train_BA[i]=name['train_BA_time']
test_BA[i]=name['test_BA_time']
seqlength=500
interval=50#步进的数据量
train_size=int((100000-seqlength)/interval+1)
test_size = int((20000 - seqlength) / interval)+1
#初始化
x_train = np.zeros(shape=[train_size * 10, 3, seqlength,1])
x_test = np.zeros(shape=[test_size * 10, 3, seqlength,1])
y_train = np.zeros(train_size*10)
y_test = np.zeros(test_size*10)
#放入数据
for i in range(10):
for j in range(train_size):
x_train[i * train_size + j][0] = train_DE[i][interval * j:interval * j + seqlength]
x_train[i * train_size + j][1] = train_FE[i][interval * j:interval * j + seqlength]
x_train[i * train_size + j][2] = train_BA[i][interval * j:interval * j + seqlength]
y_train[i * train_size + j] = i
for j in range(test_size):
x_test[i * test_size + j][0] = test_DE[i][interval * j:interval* j + seqlength]
x_test[i * test_size + j][1] = test_FE[i][interval* j:interval* j + seqlength]
x_test[i * test_size + j][2] = test_BA[i][interval* j:interval * j + seqlength]
y_test[i * test_size + j] = i
#one-hot编码
y_train = keras.utils.to_categorical(y_train, 10)
y_test = keras.utils.to_categorical(y_test, 10)
#随机化训练集,所以后面可以通过重复取平均的方法直接进行交叉验证
index = [i for i in range(len(x_train))]
random.shuffle(index)
x_train = x_train[index]
y_train = y_train[index]
x_train = x_train[0:int(train_size * 10 * proportion), ]
y_train = y_train[0:int(train_size * 10 * proportion), ]
#reshape
x_train = x_train.reshape(x_train.shape[0], seqlength, 3)
x_test = x_test.reshape(x_test.shape[0], seqlength, 3)
input_shape = ( seqlength, 3)
#一维卷积网络构建
model = Sequential()
#第一层卷积
model.add(Conv1D(filters=16, kernel_size=64, strides=16, padding='same', kernel_regularizer=l2(1e-4), input_shape=input_shape))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2))
#第二层卷积
model.add(Conv1D(32,kernel_size=3, strides=1, padding='same', kernel_regularizer=l2(1e-4), input_shape=input_shape))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling1D(pool_size=2, strides=2, padding='valid'))
#LSTM
#model.add(LSTM(64, activation='tanh', recurrent_activation='hard_sigmoid', kernel_initializer='glorot_uniform', recurrent_initializer='orthogonal', bias_initializer='zeros', return_sequences=True))
model.add(Flatten())
model.add(Dropout(0.2))
model.add(Dense(32))
model.add(Activation("relu"))
model.add(Dense(units=10, activation='softmax', kernel_regularizer=l2(1e-4)))
sgd = SGD(lr=0.05, decay=1e-6,momentum=0.9,nesterov=True)
early_stopping = EarlyStopping(monitor='val_loss',patience=40,verbose=2)
model.compile(optimizer='Adam', loss='categorical_crossentropy',
metrics=['accuracy'])
model.summary()
History=model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs,
verbose=1, validation_split=0.2, shuffle=True, callbacks=[early_stopping])
scores=model.evaluate(x_test,y_test)
print()
print('accuracy=', scores[1])
#f score
train_pre= model.predict_classes(x_train)
test_pre=model.predict_classes(x_test)
print('训练集分类报告\n',classification_report(np.argmax(y_train,1),train_pre))
print('测试集分类报告\n',classification_report(np.argmax(y_test,1),test_pre))
#confusion matrix绘制
predictions= model.predict_classes(x_test)
def my_plot_confusion_matrix(cm, labels_name, title):
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] # 归一化
plt.imshow(cm, interpolation='nearest') # 在特定的窗口上显示图像
plt.title(title) # 图像标题
plt.colorbar()
num_local = np.array(range(len(labels_name)))
plt.xticks(num_local, labels_name, rotation=90) # 将标签印在x轴坐标上
plt.yticks(num_local, labels_name) # 将标签印在y轴坐标上
plt.ylabel('True label')
plt.xlabel('Predicted label')
cm = confusion_matrix(np.argmax(y_test, 1), predictions)
print(cm)
labels=['0','1','2','3','4','5','6','7','8','9']
my_plot_confusion_matrix(cm, labels, "HAR Confusion Matrix")
plt.savefig('/HAR_cm.png', format='png')
plt.show()
#ROC绘制
y_score=model.predict_proba(x_test)
def paintRoc(y_true,y_score,i):
fpr,tpr,thresholds=roc_curve(y_true.ravel(),y_score.ravel())
roc_auc=auc(fpr,tpr)
plt.plot(fpr,tpr,lw=5,alpha=0.8,color='r',label='ROC_auc(AUC=%0.2f)'%(roc_auc))
plt.plot([0, 1], [0, 1], linestyle='--', lw=2, color='r',
label='Luck', alpha=.8)
plt.xlabel('FPR')
plt.ylabel('TPR')
plt.title('Roc(AUC=%0.2f of state %d)'%((roc_auc),i))
plt.legend(loc="lower right")
plt.show()
for i in range(10):
paintRoc(y_test[:,i],y_score[:,i],i)