import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, LSTM, Dropout, Flatten, Dense, Add
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, Activation, AveragePooling2D, Multiply, MaxPooling2D, Dense, Reshape
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, LSTM, Dropout, Dense, Reshape, \
Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, BatchNormalization, Flatten, Dense, GRU, Reshape, \
Dropout
from tensorflow.keras.optimizers import Adam
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from tensorflow.keras.utils import to_categorical
# 载入数据
train_data = pd.read_csv("F:\毕业论文\毕业论文\数据集/UNSW_NB15_training-set.csv")
test_data = pd.read_csv("F:\毕业论文\毕业论文\数据集/UNSW_NB15_testing-set.csv")
data = pd.concat([train_data, test_data], ignore_index=True)
# 删除首尾
data = data.drop(columns=['id', 'label'])
# 数值化
protocol_encoder = LabelEncoder()
state_encoder = LabelEncoder()
service_encoder = LabelEncoder()
data['proto'] = protocol_encoder.fit_transform(data['proto'])
data['state'] = state_encoder.fit_transform(data['state'])
data['service'] = service_encoder.fit_transform(data['service'])
# One-hot
data = pd.get_dummies(data, columns=['proto', 'state', 'service'])
# Normalize other numerical features
scaler = MinMaxScaler()
data[data.columns.difference(['attack_cat'])] = scaler.fit_transform(data[data.columns.difference(['attack_cat'])])
# convert attack category to numerical values and then one-hot encode
attack_category_encoder = LabelEncoder()
data['attack_cat'] = attack_category_encoder.fit_transform(data['attack_cat'])
y = pd.get_dummies(data['attack_cat']).values
# Drop the target variable from input features
X = data.drop(columns=['attack_cat'])
# plit the data into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# Reshape input data for the model
X_train_reshaped = X_train.values.reshape(X_train.shape[0], 14, 14, 1)
X_val_reshaped = X_val.values.reshape(X_val.shape[0], 14, 14, 1)
def channel_attention(input_feature, ratio=8):
channel_axis = 1 if tf.keras.backend.image_data_format() == "channels_first" else -1
channel = input_feature.shape[channel_axis]
shared_layer_one = Dense(channel // ratio, activation='relu', kernel_initializer='he_normal', use_bias=True,
bias_initializer='zeros')
shared_layer_two = Dense(channel, kernel_initializer='he_normal', use_bias=True, bias_initializer='zeros')
# Average Pooling
avg_pool = tf.keras.layers.GlobalAveragePooling2D()(input_feature)
avg_pool = Reshape((1, 1, channel))(avg_pool)
avg_pool = shared_layer_one(avg_pool)
avg_pool = shared_layer_two(avg_pool)
# Max Pooling
max_pool = tf.keras.layers.GlobalMaxPooling2D()(input_feature)
max_pool = Reshape((1, 1, channel))(max_pool)
max_pool = shared_layer_one(max_pool)
max_pool = shared_layer_two(max_pool)
cbam_feature = Add()([avg_pool, max_pool])
cbam_feature = Activation('sigmoid')(cbam_feature)
return Multiply()([input_feature, cbam_feature])
def spatial_attention(input_feature):
kernel_size = 7
avg_pool = tf.keras.backend.mean(input_feature, axis=3, keepdims=True)
max_pool = tf.keras.backend.max(input_feature, axis=3, keepdims=True)
concat = tf.keras.layers.Concatenate(axis=3)([avg_pool, max_pool])
cbam_feature = Conv2D(filters=1, kernel_size=kernel_size, strides=1, padding='same', activation='sigmoid',
kernel_initializer='he_normal', use_bias=False)(concat)
return Multiply()([input_feature, cbam_feature])
def cbam_block(cbam_feature, ratio=8):
cbam_feature = channel_attention(cbam_feature, ratio)
cbam_feature = spatial_attention(cbam_feature)
return cbam_feature
def build_parallel_CBAMCNN_GRU(input_shape, num_classes=10):
inputs = Input(shape=input_shape)
# CBAMCNN Branch
x = Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
# GRU Branch
gru_x_shape = inputs.shape
gru_x = Reshape((gru_x_shape[1] * gru_x_shape[2], gru_x_shape[3]))(inputs)
gru_x = GRU(50)(gru_x)
# Concatenate the features from both branches
combined = tf.keras.layers.Concatenate()([x, gru_x])
# FC layers
x = Dense(45, activation='relu')(combined)
outputs = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=inputs, outputs=outputs)
return model
input_shape = (14, 14, 1)
num_classes = 10
# Create the parallel model
parallel_model = build_parallel_CBAMCNN_GRU(input_shape, num_classes)
# Compile the model
parallel_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
parallel_model.fit(X_train_reshaped, y_train, epochs=100, batch_size=64, validation_data=(X_val_reshaped, y_val))
# Model summary
parallel_model.summary()
# 画图
import matplotlib.pyplot as plt
# 训练过程中收集的损失数据
train_loss = parallel_model.history.history['loss']
val_loss = parallel_model.history.history['val_loss']
# 绘制训练损失和验证损失曲线
plt.plot(train_loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
# 添加标签和标题
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
# 添加图例
plt.legend()
# 保存图像为文件
plt.savefig('loss_plot.png')
# 显示图形
plt.show()
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_curve, auc
# 使用验证集数据进行预测
y_pred = parallel_model.predict(X_val_reshaped)
# 将预测结果从 one-hot 编码转换为类别标签
y_pred_labels = y_pred.argmax(axis=1)
y_val_labels = y_val.argmax(axis=1)
# 计算准确率
accuracy = accuracy_score(y_val_labels, y_pred_labels)
# 计算查准率
precision = precision_score(y_val_labels, y_pred_labels, average='macro')
# 计算召回率
recall = recall_score(y_val_labels, y_pred_labels, average='macro')
# 计算 F1 分数
f1 = f1_score(y_val_labels, y_pred_labels, average='macro')
# 打印这些指标
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)