from keras import backend as K
from keras.layers import Layer
"""
压缩函数,使用0.5替代hinton论文中的1,如果是1,所有的向量的范数都将被缩小。
如果是0.5,小于0.5的范数将缩小,大于0.5的将被放大
"""
def squash(x, axis=-1):
s_quared_norm = K.sum(K.square(x), axis, keepdims=True) + K.epsilon() #||x||^2
scale = K.sqrt(s_quared_norm) / (0.5 + s_quared_norm) #||x||/(0.5+||x||^2)
result = scale * x
return result
# 定义我们自己的softmax函数,而不是K.softmax.因为K.softmax不能指定轴
def softmax(x, axis=-1):
ex = K.exp(x - K.max(x, axis=axis, keepdims=True))
result = ex / K.sum(ex, axis=axis, keepdims=True)
return result
# 定义边缘损失,输入y_true, p_pred,返回分数,传入fit即可
def margin_loss(y_true, y_pred):
lamb, margin = 0.5, 0.1
result = K.sum(y_true * K.square(K.relu(1 - margin -y_pred))
+ lamb * (1-y_true) * K.square(K.relu(y_pred - margin)), axis=-1)
return result
class Capsule(Layer):
def __init__(self,
num_capsule,
dim_capsule,
routings=3,
share_weights=True,
activation='squash',
**kwargs):
super(Capsule, self).__init__(**kwargs) # Capsule继承**kwargs参数
self.num_capsule = num_capsule
self.dim_capsule = dim_capsule
self.routings = routings
self.share_weights = share_weights
if activation == 'squash':
self.activation = squash
else:
self.activation = activation.get(activation) # 得到激活函数
# 定义权重
def build(self, input_shape):
input_dim_capsule = input_shape[-1]
if self.share_weights:
# 自定义权重
self.kernel = self.add_weight( #[row,col,channel]->[1,input_dim_capsule,num_capsule*dim_capsule]
name='capsule_kernel',
shape=(1, input_dim_capsule,
self.num_capsule * self.dim_capsule),
initializer='glorot_uniform',
trainable=True)
else:
input_num_capsule = input_shape[-2]
self.kernel = self.add_weight(
name='capsule_kernel',
shape=(input_num_capsule, input_dim_capsule,
self.num_capsule * self.dim_capsule),
initializer='glorot_uniform',
trainable=True)
super(Capsule, self).build(input_shape) # 必须继承Layer的build方法
# 层的功能逻辑(核心)
def call(self, inputs):
if self.share_weights:
#inputs: [batch, input_num_capsule, input_dim_capsule]
#kernel: [1, input_dim_capsule, num_capsule*dim_capsule]
#hat_inputs: [batch, input_num_capsule, num_capsule*dim_capsule]
hat_inputs = K.conv1d(inputs, self.kernel)
else:
hat_inputs = K.local_conv1d(inputs, self.kernel, [1], [1])
batch_size = K.shape(inputs)[0]
input_num_capsule = K.shape(inputs)[1]
hat_inputs = K.reshape(hat_inputs,
(batch_size, input_num_capsule,
self.num_capsule, self.dim_capsule))
#hat_inputs: [batch, input_num_capsule, num_capsule, dim_capsule]
hat_inputs = K.permute_dimensions(hat_inputs, (0, 2, 1, 3))
#hat_inputs: [batch, num_capsule, input_num_capsule, dim_capsule]
b = K.zeros_like(hat_inputs[:, :, :, 0])
#b: [batch, num_capsule, input_num_capsule]
for i in range(self.routings):
c = softmax(b, 1)
o = self.activation(K.batch_dot(c, hat_inputs, [2, 2]))
if K.backend() == 'theano':
o = K.sum(o, axis=1)
if i < self.routings-1:
b += K.batch_dot(o, hat_inputs, [2, 3])
if K.backend() == 'theano':
o = K.sum(o, axis=1)
return o
def compute_output_shape(self, input_shape): # 自动推断shape
return (None, self.num_capsule, self.dim_capsule)