1.数据
数据部分需要两个文件,一个是字符转化成索引的字典char2id.json,一个是用来训练的语料data.json。
char2id.json字典格式如下
{"UNK": 0, "淖": 20, "箩": 21, "雨": 22, "漳": 23,...}
data.json语料格式如下
[{"text": ["迈", "向", "充", "满", "希", "望", "的", "新", "世", "纪", "—", "—", "一", "九", "九", "八", "年", "新", "年", "讲", "话", "(", "附", "图", "片", "1", "张", ")"], "label": ["B", "E", "B", "E", "B", "E", "S", "S", "B", "E", "B", "E", "B", "M", "M", "M", "E", "B", "E", "B", "E", "S", "S", "B", "E", "S", "S", "S"]},...]
2.变量声明
如果函数没有特别声明,既是HMM类的实例方法。
# 普通方法
def load(load_path):
with open(load_path, 'r', encoding='utf-8') as f:
return json.load(f)
# HMM类的构造方法
def __init__(self):
self.char2id = load('char2id.json')
self.tag2id = {'B': 0, 'M': 1, 'E': 2, 'S': 3}
self.id2tag = {v: k for k, v in self.tag2id.items()}
self.tag_size = len(self.tag2id)
self.vocab_size = max(self.char2id.values()) + 1
self.Pi = np.zeros(self.tag_size)
self.A = np.zeros([self.tag_size, self.tag_size])
self.B = np.zeros([self.tag_size, self.vocab_size])
self.epsilon = 1e-8
-
self.char2id:字典-索引
-
self.tag2id:词性标签-索引
-
self.id2tag:索引-词性标签
-
self.tag_size:标签种类数
-
self.vocab_size:词典大小
- self.Pi、self.A、self.B:HMM模型矩阵,Pi表示初始隐状态概率矩阵,A表示由t-1时刻隐状态转移到t时刻隐状态的概率矩阵,B表示当前隐状态下转移到观测矩阵的概率矩阵
- epsilon:一个非0的极小值,防止计算概率值时结果下溢的情况
3.构建HMM矩阵
def get_Pi_A_B(self, data):
for dic in tqdm(data):
if dic['label'] == [] or dic['text'] == []:
continue
self.Pi[self.tag2id[dic['label'][0]]] += 1
for index, tag in enumerate(dic['label'][1:]):
this_tag = self.tag2id[tag]
before_tag = self.tag2id[dic['label'][index]]
self.A[before_tag][this_tag] += 1
self.A[self.A == 0] = self.epsilon
self.A /= np.sum(self.A, axis=0, keepdims=True)
self.Pi[self.Pi == 0] = self.epsilon
self.Pi /= np.sum(self.Pi)
for dic in tqdm(data):
for char, tag in zip(dic['text'], dic['label']):
self.B[self.tag2id[tag]][self.char2id[char]] += 1
self.B[self.B == 0] = self.epsilon
self.B /= np.sum(self.B, axis=1, keepdims=True)
def fit(self, data_path):
train_data = load(data_path)
self.get_Pi_A_B(train_data)
self.Pi = np.log(self.Pi)
self.A = np.log(self.A)
self.B = np.log(self.B)
print("DONE!")
Pi、A、B矩阵都是通过统计语料,使用频率代替概率。fit函数将矩阵log以后原本的乘法就会变成简单的矩阵加法。
4.维特比解码
def viterbi_decode(self, text):
seq_len = len(text)
T1_table = np.zeros([self.tag_size, seq_len])
T2_table = np.zeros([self.tag_size, seq_len])
start_p_obs_state = self.get_p_obs(text[0])
T1_table[:, 0] = self.Pi + start_p_obs_state
T2_table[:, 0] = np.nan
for i in range(1, seq_len):
p_obs_state = self.get_p_obs(text[i])
p_obs_state = np.expand_dims(p_obs_state, axis=0)
prev_score = np.expand_dims(T1_table[:, i - 1], axis=-1)
score = prev_score + self.A + p_obs_state
T1_table[:, i] = np.max(score, axis=0)
T2_table[:, i] = np.argmax(score, axis=0)
best_tag_id = int(np.argmax(T1_table[:, -1]))
best_tags = [best_tag_id, ]
for i in range(seq_len-1, 0, -1):
best_tag_id = int(T2_table[best_tag_id, i])
best_tags.append(best_tag_id)
return list(reversed(best_tags))
def get_p_obs(self, char):
char_token = self.char2id.get(char, 0)
if char_token == 0:
return np.log(np.ones(self.tag_size)/self.tag_size)
return np.ravel(self.B[:, char_token])
- T1_table、T2_table:T1_table记录t-1时刻的隐状态转移到t时刻隐状态的最大值。T2_table记录最大值的索引,通过回溯得到最佳的隐状态序列,即是预测的词性。
这里运用了numpy中的矩阵广播特性一次性计算在当前观测状态下的所有隐状态概率情况,再经过max或是argmax算出最大值填入T1和T2矩阵。
5.其他
def predict(self, text):
if len(text) == 0:
raise NotImplementedError("输入文本为空!")
best_tag_id = self.viterbi_decode(text)
self.print_func(text, best_tag_id)
def print_func(self, text, best_tags_id):
for char, tag_id in zip(text, best_tags_id):
print(char+"_"+self.id2tag[tag_id]+"|", end="")
def load_weights(self, Pi_path, A_path, B_path):
self.Pi = np.load(Pi_path)
self.A = np.load(A_path)
self.B = np.load(B_path)
def save_weights(self, Pi_path, A_path, B_path):
for data, path in zip([self.Pi, self.A, self.B], [Pi_path, A_path, B_path]):
np.save(path, data)
函数作用依次是:预测、输出、加载权重和保存权重。
参考: