import tensorflow as tf
import numpy as np
from tqdm import tqdm
import re
inputshape = 25088
class SOM(object):
_trained = False
def __init__(self, m, n,dim, n_iterations=100, alpha=None, sigma=None):
self._m = m
self._n = n
self.dim = dim
if alpha is None:
alpha = 0.3
else:
alpha = float(alpha)
if sigma is None:
sigma = max(m, n) / 2.0
else:
sigma = float(sigma)
self._n_iterations = abs(int(n_iterations))
self._centroid_grid = np.zeros(shape=(m,n))
self.loaded_weights = None
##INITIALIZE GRAPH
self._graph = tf.Graph()
##POPULATE GRAPH WITH NECESSARY COMPONENTS
with self._graph.as_default():
self._weightage_vects = tf.Variable(tf.random_normal(
[m * n, dim],mean=0.5))
self._location_vects = tf.constant(np.array(
list(self._neuron_locations(m, n))))
self._vect_input = tf.placeholder("float", [inputshape]) #输入的图片特征向量
self._dense_input,self._output,self._dense_weights = self.dense(self._vect_input)
self._iter_input = tf.placeholder("float")
winner_index = tf.argmin(tf.sqrt(tf.reduce_sum(
tf.square(tf.subtract(self._weightage_vects, tf.stack(
[self._dense_input[0] for i in range(m * n)]))), 1)),
0)
slice_input = tf.pad(tf.reshape(winner_index, [1]),
np.array([[0, 1]]))
self.winner_loc = tf.reshape(tf.slice(self._location_vects, slice_input,
tf.constant(np.array([1, 2]),dtype=tf.int64)),
[2])#等于self._location_vects[winner_index]
print(self.winner_loc)
learning_rate_op = tf.subtract(1.0, tf.divide(self._iter_input,
self._n_iterations))
_alpha_op = learning_rate_op*alpha
_sigma_op = sigma*learning_rate_op
bmu_distance_squares = tf.reduce_sum(tf.square(tf.subtract(
self._location_vects, tf.stack(
[self.winner_loc for i in range(m * n)]))), 1)#领域距离的平方向量
neighbourhood_func = tf.exp(tf.negative(tf.divide(tf.cast(
bmu_distance_squares, "float32"), tf.square(_sigma_op))))
learning_rate_op = tf.multiply(_alpha_op, neighbourhood_func) #参数向量
learning_rate_multiplier = tf.stack([tf.tile(tf.slice(
learning_rate_op, np.array([i]), np.array([1])), [dim])
for i in range(m * n)])
weightage_delta = tf.multiply(
learning_rate_multiplier,
tf.subtract(tf.stack([self._dense_input[0] for i in range(m * n)]),
self._weightage_vects)) #增量矩阵
new_weightages_op = tf.add(self._weightage_vects,
weightage_delta)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self._output,labels=[self.winner_loc[0]+self.winner_loc[1]])
train_step = tf.train.ProximalGradientDescentOptimizer(learning_rate=_alpha_op).minimize(cross_entropy)
self._training_op = tf.group(tf.assign(self._weightage_vects,
new_weightages_op),train_step)
##INITIALIZE SESSION
self._sess = tf.Session()
##INITIALIZE VARIABLES
init_op = tf.global_variables_initializer()
self._sess.run(init_op)
def _neuron_locations(self, m, n):
for i in range(m):
for j in range(n):
yield np.array([i, j])
def train(self, input_vects):
for iter_no in tqdm(range(self._n_iterations)):
# Train with each vector one by one
for input_vect in input_vects:
# print(dense_vect.shape)
self._sess.run(self._training_op,
feed_dict={self._vect_input: input_vect,
self._iter_input: iter_no})
# Store a centroid grid for easy retrieval later on
self._locations = self._sess.run(self._location_vects)
self._centroid_grid = self._sess.run(self._weightage_vects)
self._dense_vects = self._sess.run(self._dense_weights)
self._trained = True
def save_weights(self,filename):
need_save = [self._dense_vects,self._centroid_grid]
if filename.endswith('npy'):
np.save(filename,need_save)
elif filename.endswith('txt'):
with open(filename,'w') as f:
f.write(str(need_save))
elif filename.endswith('json'):
import json
with open(filename,'w') as f:
json.dump(str(need_save),f)
else:
raise TypeError('cannot identify this type file')
def load_weights(self,filename):
if filename.endswith('npy'):
weigths = np.load(filename)
elif filename.endswith('txt'):
with open(filename,'r') as f:
weigths = eval(f.read())
elif filename.endswith('json'):
import json
with open(filename,'r') as f:
weigths = json.load(f)
else:
raise TypeError('cannot identify this type file')
self.loaded_weights = weigths
def predict(self,input_vects):
if self.loaded_weights is None:
raise RuntimeError('the weights not loaded')
to_return = []
locations =list(self._neuron_locations(self._m,self._n))
dense_vects = np.dot(input_vects,self.loaded_weights[0])+0.1
for vect in dense_vects:
min_index = min([i for i in range(len(self.loaded_weights[1]))],
key=lambda x: np.linalg.norm(vect - self.loaded_weights[1][x]))
to_return.append(locations[min_index])
return to_return
def evaluate(self,name_data,predict_data,classes):
analysis = {}
for i in range(self._m * self._n):
each_class = {}
for clas in set(classes):
each_class[clas] = 0
analysis[i] = each_class
for name, result in zip(name_data, predict_data):
idx = int(result[0]*self._n+result[1])
analysis[idx][re.search("|".join(classes), name).group()] += 1
score = {}
for key, value in analysis.items():
max_item = max(value.items(), key=lambda x: x[1])
score[key] = {max_item[0]: max_item[1] / sum(value.values())}
return analysis,score
def dense(self,inputs_vect):
inputs_vect = tf.expand_dims(inputs_vect,axis=0)
w1 = tf.Variable(tf.random_normal([tf.shape(inputs_vect)[1], self.dim]))
b1 = tf.constant(0.1,shape=[self.dim])
z1 = tf.matmul(inputs_vect,w1)+b1
a1 = tf.nn.relu(z1)
w2 = tf.Variable(tf.random_normal([self.dim, self._m*self._n]))
b2 = tf.constant(0.1,shape=[self._m*self._n])
z2 = tf.matmul(a1,w2)+b2
return a1,z2,w1
评论0
最新资源