from keras.layers import Dense,Flatten,Input,Activation,ZeroPadding2D,AveragePooling2D,BatchNormalization,Conv2D,Add,MaxPooling2D
from keras.models import Model
import matplotlib.pyplot as plt
from keras.preprocessing import image
from keras.applications.imagenet_utils import preprocess_input
import resnets_utils
import keras.backend as K
import numpy as np
from keras.initializers import glorot_uniform
import tensorflow as tf
import time
"""
获取数据 并将标签转换成one-hot
"""
def convert_data():
train_set_x_orig, train_set_y_orig, test_set_x_orig, test_set_y_orig, classes=resnets_utils.load_dataset()
train_x=train_set_x_orig/255
test_x = test_set_x_orig / 255
train_y=resnets_utils.convert_to_one_hot(train_set_y_orig,6).T
test_y = resnets_utils.convert_to_one_hot(test_set_y_orig, 6).T
#print(train_y.shape)
return train_x,train_y,test_x,test_y
"""
三层卷积的 残差单元 输出尺寸和维度不会变化
"""
def identity_block(X,f,filters,stage,block):
conv_name_base='res'+str(stage)+block+'_branch'
bn_name_base='bn'+str(stage)+block+'_branch'
F1,F2,F3=filters
X_shortcut=X
# print('输入尺寸={}'.format(X.shape))
#first conv
X=Conv2D(filters=F1,kernel_size=(1,1),strides=(1,1),padding='valid',name=conv_name_base+'2a',
kernel_initializer=glorot_uniform(seed=0))(X)
# print('输出尺寸={}'.format(X.shape))
X=BatchNormalization(axis=3,name=bn_name_base+'2a')(X)
X=Activation('relu')(X)
#second conv
X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b',
kernel_initializer=glorot_uniform(seed=0))(X)
# print('输出尺寸={}'.format(X.shape))
X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
X = Activation('relu')(X)
#third conv
X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c',
kernel_initializer=glorot_uniform(seed=0))(X)
X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)
# print('输出尺寸={}'.format(X.shape))
#ResNet
X=Add()([X,X_shortcut])
X = Activation('relu')(X)
# print('最终输出尺寸={}'.format(X.shape))
return X
"""
三层卷积的 残差单元 输出尺寸和维度会变化
"""
def convolutional_block(X,f,filters,stage,block,s=2):
conv_name_base = 'res' + str(stage) + block + '_branch'
bn_name_base = 'bn' + str(stage) + block + '_branch'
F1, F2, F3 = filters
X_shortcut = X
# print('输入尺寸={}'.format(X.shape))
# first conv
X = Conv2D(filters=F1, kernel_size=(1, 1), strides=(s, s), padding='valid', name=conv_name_base + '2a',
kernel_initializer=glorot_uniform(seed=0))(X)
X = BatchNormalization(axis=3, name=bn_name_base + '2a')(X)
X = Activation('relu')(X)
# print('输出尺寸={}'.format(X.shape))
# second conv
X = Conv2D(filters=F2, kernel_size=(f, f), strides=(1, 1), padding='same', name=conv_name_base + '2b',
kernel_initializer=glorot_uniform(seed=0))(X)
X = BatchNormalization(axis=3, name=bn_name_base + '2b')(X)
X = Activation('relu')(X)
# print('输出尺寸={}'.format(X.shape))
#third conv
X = Conv2D(filters=F3, kernel_size=(1, 1), strides=(1, 1), padding='valid', name=conv_name_base + '2c',
kernel_initializer=glorot_uniform(seed=0))(X)
X = BatchNormalization(axis=3, name=bn_name_base + '2c')(X)
X = Activation('relu')(X)
# print('输出尺寸={}'.format(X.shape))
#ResNet
X_shortcut=Conv2D(filters=F3, kernel_size=(1, 1), strides=(s, s), padding='valid', name=conv_name_base + '1',
kernel_initializer=glorot_uniform(seed=0))(X_shortcut)
X_shortcut = BatchNormalization(axis=3, name=bn_name_base + '1')(X_shortcut)
# print('原始输入X经过变化的输出尺寸={}'.format(X.shape))
X = Add()([X, X_shortcut])
X = Activation('relu')(X)
# print('最终输出尺寸={}'.format(X.shape))
return X
"""
50层残差网络
"""
def ResNet50(input_shape=(64,64,3),classes=6):
X_input=Input(input_shape)
print('输入尺寸={}'.format(X_input.shape))
X=ZeroPadding2D((3,3))(X_input)
print('补完零尺寸={}'.format(X.shape))
#Stage 1
X=Conv2D(filters=64,kernel_size=(7,7),strides=(2,2),name='conv1',
kernel_initializer=glorot_uniform(seed=0))(X)
print('第一次卷积尺寸={}'.format(X.shape))
X=BatchNormalization(axis=3,name='bn_conv1')(X)
X=Activation('relu')(X)
X=MaxPooling2D(pool_size=(3,3),strides=(2,2))(X)
print('第一次池化尺寸={}'.format(X.shape))
#Stage 2
X=convolutional_block(X,f=3,filters=[64,64,256],stage=2,block='a',s=1)
print('第一次convolutional_block尺寸={}'.format(X.shape))
X = identity_block(X, f=3, filters=[64, 64, 256], stage=2, block='b')
X = identity_block(X, f=3, filters=[64, 64, 256], stage=2, block='c')
print('两次identity_block尺寸={}'.format(X.shape))
#Stage 3
X = convolutional_block(X, f=3, filters=[128, 128, 512], stage=3, block='a', s=2)
print('第二次convolutional_block尺寸={}'.format(X.shape))
X = identity_block(X, f=3, filters=[128, 128, 512], stage=3, block='b')
X = identity_block(X, f=3, filters=[128, 128, 512], stage=3, block='c')
X = identity_block(X, f=3, filters=[128, 128, 512], stage=3, block='d')
print('三次identity_block尺寸={}'.format(X.shape))
#Stage 4
X = convolutional_block(X, f=3, filters=[256, 256, 1024], stage=4, block='a', s=2)
print('第三次convolutional_block尺寸={}'.format(X.shape))
X = identity_block(X, f=3, filters=[256, 256, 1024], stage=4, block='b')
X = identity_block(X, f=3, filters=[256, 256, 1024], stage=4, block='c')
X = identity_block(X, f=3, filters=[256, 256, 1024], stage=4, block='d')
X = identity_block(X, f=3, filters=[256, 256, 1024], stage=4, block='e')
X = identity_block(X, f=3, filters=[256, 256, 1024], stage=4, block='f')
print('五次identity_block尺寸={}'.format(X.shape))
#Stage 5
X = convolutional_block(X, f=3, filters=[512, 512, 2048], stage=5, block='a', s=2)
print('第四次convolutional_block尺寸={}'.format(X.shape))
X = identity_block(X, f=3, filters=[512, 512, 2048], stage=5, block='b')
X = identity_block(X, f=3, filters=[512, 512, 2048], stage=5, block='c')
print('两次identity_block尺寸={}'.format(X.shape))
#Pool
X=AveragePooling2D(pool_size=(2,2))(X)
print('最后一次平均池化尺寸={}'.format(X.shape))
#OutPut Flatten+FULLYCONNECTED
X=Flatten()(X)
X=Dense(units=classes,activation='softmax',name='fc'+str(classes),kernel_initializer=glorot_uniform(seed=0))(X)
#create model
model=Model(inputs=X_input,outputs=X,name='ResNet50')
return model
def test_identity_block():
with tf.Session() as sess:
np.random.seed(1)
A_prev=tf.placeholder('float',[3,4,4,6])
X=np.random.randn(3,4,4,6)
A=identity_block(A_prev,f=2,filters=[2,4,6],stage=1,block='a')
init=tf.global_variables_initializer()
sess.run(init)
out=sess.run([A],feed_dict={A_prev:X,K.learning_phase():0})
# print('out=',out[0][1][1][0])
def test_convolutional_block():
#tf.reset_default_graph()
with tf.Session() as sess:
np.random.seed(1)
A_prev=tf.placeholder('float',[3,4,4,6])
X=np.random.randn(3,4,4,6)
A=convolutional_block(A_prev,f=2,filters=[2,4,6],stage=1,block='a',s=2)
init = tf.global_variables_initializer()
sess.run(init)
out=sess.run(A,feed_dict={A_prev:X})
print('out=',out[0][0][0])
def test_ResNet50():
#定义好模型结构
Resnet50_model=ResNet50(input_shape=(64,64,3),classes=6)
#选定训练参数
Resnet50_model.compile(optimizer='adam',loss='categorical_crossentropy',metrics=['accuracy'])
#获取训练集和测试集
train_x, train_y,