import numpy as np
import torch # 用torch.save来存model
class myHMM:
def __init__(self):
self.A = {} # 状态转移概率矩阵
self.B = {} # 发射概率矩阵
self.PI = {} # 初始状态矩阵
self.stateSet = [] #状态矩阵
self.state_num = {} # ‘B,I,E,S’每个状态在训练集中出现的次数
self.Sentence_Num = 0 # 训练集中语句数量
def initArray(self):
'''
初始化所有矩阵
:return:
'''
#初始化状态矩阵
self.stateSet = ['B', 'I', 'E', 'S']
# 初始化状态转移矩阵
for state0 in self.stateSet:
self.A[state0] = {}
for state1 in self.stateSet:
self.A[state0][state1] = 0.0
# 初始化状态矩阵,初始化发射矩阵,初始化状态计数
for state in self.stateSet:
self.PI[state] = 0.0 # PI['B'] = 0.0等
self.B[state] = {}
self.state_num[state] = 0
def getTrainSet(self):
'''
获取训练集
'''
trainSet = {} # 字典,句子对应状态
tmpSentence = ''
tmpSentenceState = ''
preNull = False
with open('../dataset/dataset2/train.utf8', encoding='utf-8') as trainFile:
while True:
s = trainFile.readline()
if s == "": # 文件读完
break
s = s.strip() # 去掉头尾空格
if s == "": # 读到换行符
if not preNull:
trainSet[tmpSentence] = tmpSentenceState
tmpSentence = ''
tmpSentenceState = ''
preNull = True
continue
preNull = False
s = s.replace(" ", "")
tmpSentence += s[0]
tmpSentenceState += s[1]
with open('../dataset/dataset1/train.utf8', encoding='utf-8') as trainFile:
while True:
s = trainFile.readline()
if s == "": # 文件读完
break
s = s.strip() # 去掉头尾空格
if s == "": # 读到换行符
if not preNull:
trainSet[tmpSentence] = tmpSentenceState
tmpSentence = ''
tmpSentenceState = ''
preNull = True
continue
preNull = False
s = s.replace(" ", "")
tmpSentence += s[0]
tmpSentenceState += s[1]
#print(len(trainSet))
#print(trainSet)
return trainSet
def Data_processing(self):
'''
对数据进行预处理
将‘0’变为极小值-3.14e+100
将统计结果取对数
:return:
'''
for key in self.PI:
# 如果该项为0,则手动赋予一个极小值
if self.PI[key] == 0.0:
self.PI[key] = -3.14e+100
# 如果不为0,则计算概率,再对概率求log
else:
self.PI[key] = np.log(self.PI[key] / self.Sentence_Num)
# 状态转移概率,与上方PI思路一样,求得A的概率对数
for key0 in self.A:
for key1 in self.A[key0]:
if self.A[key0][key1] == 0.0:
self.A[key0][key1] = -3.14e+100
else:
self.A[key0][key1] = np.log(
self.A[key0][key1] / self.state_num[key0])
# 发射概率,与上方PI思路一样,求得B的概率对数
for key in self.B:
for word in self.B[key]:
if self.B[key][word] == 0.0:
self.B[key][word] = -3.14e+100
else:
self.B[key][word] = np.log(self.B[key][word] / self.state_num[key])
def train(self):
'''
HMM中“学习”问题,通过训练集训练参数得到HMM模型
'''
self.initArray()
trainSet = self.getTrainSet() # 字典,句子:状态
# print(trainSet)
for sent in trainSet:
self.Sentence_Num += 1
wordList = [] # list类型,一个字一个entry
for i in range(len(sent)):
wordList.extend(sent[i])
lineStateList = [] # 句子的状态序列,list类型
for i in range(len(trainSet[sent])):
lineStateList.extend(trainSet[sent][i])
#如果是开头第一个字,PI中对应位置加1
self.PI[lineStateList[0]] += 1
# 因为A涉及到前一个状态,因此需要等整条状态链都生成了才能开始统计
# 统计t时刻状态和t+1时刻状态的所有状态组合的出现次数
for j in range(len(lineStateList) - 1):
self.A[lineStateList[j]][lineStateList[j + 1]] += 1
# 统计状态计数
for p in range(len(lineStateList)):
self.state_num[lineStateList[p]] += 1 # 记录每一个状态的出现次数
# 对于该单词中的每一个字,在生成的状态链中统计B
for state in self.stateSet:
#如果wordList[p]不在矩阵中,则将wordList[p]这个字加入发射概率矩阵
if wordList[p] not in self.B[state]:
self.B[state][wordList[p]] = 0.0
# 遍历状态链中每一个状态,并找到对应的中文汉字,在B中
# 对应位置加1
self.B[lineStateList[p]][wordList[p]] += 1
self.Data_processing()
torch.save(self, '../savemodel/hmm.model')
# Viterbi算法求测试集最优状态序列
def Viterbi(self, sentence, PI, A, B):
# 初始化分词后的文章列表
retsentence = []
delta = [{}] # 动态规划表
path = {} # 存路径
# 首先对在训练集中没有出现过的句首字进行处理
# 若第一个字没有出现在发射矩阵的'B'状态列表上,则默认他为S,所以其他状态的概率都为负无穷大
if sentence[0] not in B['B']:
for state in self.stateSet:
if state == 'S':
# 取0说明在训练集中没有出现,概率为0,并于无穷小相区分
B[state][sentence[0]] = 0
else:
B[state][sentence[0]] = -3.14e+100
# 依据算法10.5 第一步:初始化
for state in self.stateSet:
# delta[t][state]表示时刻t到达state状态的所有路径中,概率最大路径的概率值
# 初始化δ状态链中第一个状态的四种状态概率,因取log,所以乘法变加法
delta[0][state] = PI[state] + B[state][sentence[0]]
path[state] = [state]
#算法10.5中的第二步:递推
#依次处理整条链
for i in range(1, len(sentence)):
delta.append({})
new_path = {}
# 开始计算
for state0 in self.stateSet:
# 初始化一个临时列表,用于存放四种概率
tmpbelta = []
for state1 in self. stateSet:
# 若这个字在当前状态的发射矩阵中不存在,则默认发射概率为0忽略该字
if sentence[i] not in B:
for state in self.stateSet:
B[state][sentence[0]] = 0
prob = delta[i - 1][state1] \
+ A[state1][state0] \
+ B[st
评论0