"""
通过KD-tree进行搜索
"""
from sklearn.cluster import MiniBatchKMeans,KMeans
from sklearn.neighbors import KDTree,DistanceMetric
import pickle
import os
import numpy as np
import cv2
import time
import matplotlib.pyplot as plt
class Imgsearch():
def __init__(self):
self.path='D:\code\img_search\\256_ObjectCategories'
self.clusters=200
self.dir_path = 'data_200'
self.file_list = ['002.american-flag', '005.baseball-glove', '009.bear', '013.birdbath',
'025.cactus', '028.camel', '029.cannon', '035.cereal-box',
'037.chess-board', '043.coin', '044.comet', '045.computer-keyboard',
'053.desk-globe', '082.galaxy', '092.grapes', '103.hibiscus',
'104.homer-simpson']
if os.path.exists('{}/img_search_0521.pkl'.format(self.dir_path)):
with open('{}/img_search_0521.pkl'.format(self.dir_path),'rb') as f:
self.kmean=pickle.load(f)
else:
print('模型未训练')
self.train()
self.query_data()
self.train_kdtree()
self.query()
def train(self):
print('模型训练中......')
# 生成训练集,测试集的文件路径列表
train_lists = []
# file_list = os.listdir(self.path)[20:40]
for fp in self.file_list:
p=os.path.join(self.path,fp)
cur_f=os.listdir(p)[:80]
cur_f=[os.path.join(p,x) for x in cur_f]
train_lists.extend(cur_f)
np.random.shuffle(train_lists)
self.train_imgs = train_lists
# 读取特征向量
tensors = []
for filename in self.train_imgs:
tensor=self.sift_detect(filename)
tensors.extend(tensor)
tensors = np.array(tensors)
print(tensors.shape)
# self.kmean = KMeans(n_clusters=self.clusters, random_state=40)
self.kmean=MiniBatchKMeans(n_clusters=self.clusters,max_iter=100000000,
batch_size=5000,max_no_improvement=500,tol=1e-7)
# for i in range(int(len(tensors)/5000)):
# self.kmean.partial_fit(tensors[i:i+5000,:])
self.kmean.fit(tensors)
with open('{}/img_search_0521.pkl'.format(self.dir_path), 'wb') as f:
pickle.dump(self.kmean, f)
print('模型训结束')
def query_data(self):
print("数据生成中......")
n_clusters = self.clusters
#生成训练集,测试集的文件路径列表
#每个文件夹下取4张
train_lists=[]
for fp in self.file_list:
p=os.path.join(self.path,fp)
cur_f=os.listdir(p)[:80]
cur_f=[os.path.join(p,x) for x in cur_f]
train_lists.extend(cur_f)
# 1,生成图像索引
index_words = {} #序号对应文件名称
# 2,构建频率直方图
tf_idf = np.zeros(shape=(n_clusters), dtype=np.int32)
#3,词汇表
words=[]
for i,file in enumerate(train_lists):
tf_idf_n = np.zeros(shape=(n_clusters), dtype=np.int32)
index_words[str(i)]=file
# words_index[file]=str(i)
cur_word = np.zeros(shape=(n_clusters), dtype=np.int32)
result = self.sift_detect(file)
pre_result = self.kmean.predict(result)
# print(pre_result)
for index in pre_result:
cur_word[index] += 1
tf_idf_n[index] += 1
for i in range(len(tf_idf_n)):
if tf_idf_n[i] > 0:
tf_idf[i] += 1
cur_tf_idf = np.array([(x / len(pre_result)) for x in cur_word], dtype=np.float32)
words.append(cur_tf_idf)
# idf = np.array([np.log(len(train_lists) / (x+math.exp(-7))) for x in tf_idf], dtype=np.float32)
words=np.array(words, dtype=np.float32)
idf = np.array([np.log(len(train_lists) / (x+1) ) for x in tf_idf], dtype=np.float32)
np.save('{}/index_words.npy'.format(self.dir_path),index_words)
np.save('{}/words'.format(self.dir_path),words)
np.save('{}/idf'.format(self.dir_path), idf)
print("数据生成结束")
def train_kdtree(self):
print("kd-tree 生成中......")
dist=DistanceMetric.get_metric('euclidean')
words = np.load('{}/words.npy'.format(self.dir_path))
idf = np.load('{}/idf.npy'.format(self.dir_path))
my_words=words*idf
# 向量维度方向归一化
max_n = np.max(my_words, axis=0)
min_n = np.min(my_words, axis=0)
np.save('{}/max.npy'.format(self.dir_path),max_n)
np.save('{}/min.npy'.format(self.dir_path), min_n)
my_words=(my_words-min_n)/(max_n-min_n)
# print(my_words[0])
# tree=KDTree(my_words,leaf_size=1500,metric=dist)
tree = KDTree(my_words, leaf_size=800, metric=dist)
with open('{}/tree.pkl'.format(self.dir_path),'wb') as f:
pickle.dump(tree,f)
print("kd-tree 生成结束")
def query(self):
print("查询开始")
start=time.perf_counter()
files_index = np.load('{}/index_words.npy'.format(self.dir_path)).item()
# print(files_index)
with open('{}/tree.pkl'.format(self.dir_path),'rb') as f:
kdtree=pickle.load(f)
n_clusters=self.clusters
# words=np.load('data/words.npy')
idf = np.load('{}/idf.npy'.format(self.dir_path))
# 向量维度方向归一化
max_n = np.load('{}/max.npy'.format(self.dir_path))
min_n = np.load('{}/min.npy'.format(self.dir_path))
AP={}
for fp in self.file_list:
p = os.path.join(self.path, fp)
cur_f = os.listdir(p)[-5:]
cur_f = [os.path.join(p, x) for x in cur_f]
score_list = []
for i,file in enumerate(cur_f):
score = 0
cur_word = np.zeros(shape=(n_clusters), dtype=np.int32)
result = self.sift_detect(file)
pre_result = self.kmean.predict(result)
for index in pre_result:
cur_word[index] += 1
cur_tf_idf = np.array([(x / len(pre_result)) for x in cur_word], dtype=np.float32)
cur_words = cur_tf_idf * idf
cur_words = (cur_words - min_n) / (max_n - min_n)
cur_word = np.reshape(cur_words, (1, -1))
dist, ind = kdtree.query(cur_word, k=10)
for l,i in enumerate(ind[0]):
if os.path.dirname(file) == os.path.dirname(files_index[str(i)]):
score += 1
score_list.append(score/(l+1))
AP[fp]=np.mean(score_list,dtype=np.float32)
mAP=0
for data in AP.items():
mAP+=data[1]
# 查全率
print('AP',AP)
print('mAP',mAP/len(self.file_list))
print("查询结束",time.perf_counter()-start)
def sift_detect(self,path):
# 关键点检测、特征提取
img = cv2.imread(path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
sift = cv2.xfeatures2d.SIFT_create()
kp, des = sift.detectAndCompute(gray, None)
return des
def img_show(self,img_path):
files_index = np.load('{}/index_words.npy'.format(self.dir_path)).item()
with open('{}/tree.pkl'.format(self.dir_path), 'rb') as f:
kdtree = pickle.load(f)
n_clusters = self.clusters
# words=np.load('data/words.npy')
idf = np.load('{}/idf.npy'.format(self.dir_path))
# 向量维度方向归一化
max_n = np.load('{}/max.npy'.format(self.dir_path))
min_n = np.load('{}/min.npy'.format(s