zoukankan      html  css  js  c++  java
  • 自然语言处理之HMM模型分词

    汉语中句子以字为单位的,但语义理解仍是以词为单位,所以也就存在中文分词问题。主要的技术可以分为:规则分词、统计分词以及混合分词(规则+统计)。

    基于规则的分词是一种机械分词,主要依赖于维护词典,在切分时将与剧中的字符串与词典中的词进行匹配。主要包括正向最大匹配法、逆向最大匹配法以及双向最大匹配法。

    统计分词主要思想是将每个词视作由字组成,如果相连的字在不同文本中出现次数越多,就越可能是一个词。(隐马尔可夫【HMM】、条件随机场【CRF】等)

    l  长度为m的字符串确定其概率分布:

    P(x1,x2……,xm)=P(x1)P(x1|x1,x2)……P(xm|x1,x2,……,xm-1)

    l  用n-gram模型将上式进行简化,认为其概率仅与其前n-1个词相关:

    P~=P(xi|xi-(n-1),xi-(n-2)……,xi-1)=count(xi-(n-1),xi-(n-2)……,xi-1,xi)/count(xi-(n-1),xi-(n-2)……,xi-1)

    隐马尔可夫模型HMM将分词作为字在字串中的序列标注任务来实现。每个字在构造一个特定词语时都占据一个确定的构词结构:B(词首)、M(词中)、E(词尾)和S(单独成词)。

    l  抽象表示:

    W表示输入的句子,长度n,O表示输出标签,则理想是maxP(o1,o2…|w1,w2……)

    贝叶斯公式P(o|w)=P(w|o)P(o)/P(w) 因为P(w)是给定输出为常数,因此要最大化P(w|o)P(o)

    P(w|o)= P(w1|o1) P(w2|o3) ……P(w3|o3) 作马尔可夫假设每个输出仅与上一个输出有关

    P(o)=P(o1)P(o2|o1)……P(on|o(n-1))

    再通过Veterbi算法求最优路径(如果最终的最优路径经过某个oi那么从初始结点到oi-1必然也是最优路径)。

    代码实现如下(训练语料库没有给出):

      1 class HMM(object):
      2     def __init__(self):
      3         import os
      4 
      5         # 主要是用于存取算法中间结果,不用每次都训练模型
      6         self.model_file = 'hmm_model.pkl'
      7 
      8         # 状态值集合
      9         self.state_list = ['B', 'M', 'E', 'S']
     10         # 参数加载,用于判断是否需要重新加载model_file
     11         self.load_para = False
     12 
     13     # 用于加载已计算的中间结果,当需要重新训练时,需初始化清空结果
     14     def try_load_model(self, trained):
     15         if trained:
     16             import pickle
     17             with open(self.model_file, 'rb') as f:
     18                 self.A_dic = pickle.load(f)
     19                 self.B_dic = pickle.load(f)
     20                 self.Pi_dic = pickle.load(f)
     21                 self.load_para = True
     22 
     23         else:
     24             # 状态转移概率(状态->状态的条件概率)
     25             self.A_dic = {}
     26             # 发射概率(状态->词语的条件概率)
     27             self.B_dic = {}
     28             # 状态的初始概率
     29             self.Pi_dic = {}
     30             self.load_para = False
     31 
     32     # 计算转移概率、发射概率以及初始概率
     33     def train(self, path):
     34 
     35         # 重置几个概率矩阵
     36         self.try_load_model(False)
     37 
     38         # 统计状态出现次数,求p(o)
     39         Count_dic = {}
     40 
     41         # 初始化参数
     42         def init_parameters():
     43             for state in self.state_list:
     44                 self.A_dic[state] = {s: 0.0 for s in self.state_list}
     45                 self.Pi_dic[state] = 0.0
     46                 self.B_dic[state] = {}
     47 
     48                 Count_dic[state] = 0
     49 
     50         def makeLabel(text):
     51             out_text = []
     52             if len(text) == 1:
     53                 out_text.append('S')
     54             else:
     55                 out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']
     56 
     57             return out_text
     58 
     59         init_parameters()
     60         line_num = -1
     61         # 观察者集合,主要是字以及标点等
     62         words = set()
     63         with open(path, encoding='utf8') as f:
     64             for line in f:
     65                 line_num += 1
     66 
     67                 line = line.strip()
     68                 if not line:
     69                     continue
     70 
     71                 word_list = [i for i in line if i != ' ']
     72                 words |= set(word_list)  # 更新字的集合
     73 
     74                 linelist = line.split()
     75 
     76                 line_state = []
     77                 for w in linelist:
     78                     line_state.extend(makeLabel(w))
     79                 
     80                 assert len(word_list) == len(line_state)
     81 
     82                 for k, v in enumerate(line_state):
     83                     Count_dic[v] += 1
     84                     if k == 0:
     85                         self.Pi_dic[v] += 1  # 每个句子的第一个字的状态,用于计算初始状态概率
     86                     else:
     87                         self.A_dic[line_state[k - 1]][v] += 1  # 计算转移概率
     88                         self.B_dic[line_state[k]][word_list[k]] = 
     89                             self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0  # 计算发射概率
     90         
     91         self.Pi_dic = {k: v * 1.0 / line_num for k, v in self.Pi_dic.items()}
     92         self.A_dic = {k: {k1: v1 / Count_dic[k] for k1, v1 in v.items()}
     93                       for k, v in self.A_dic.items()}
     94         #加1平滑
     95         self.B_dic = {k: {k1: (v1 + 1) / Count_dic[k] for k1, v1 in v.items()}
     96                       for k, v in self.B_dic.items()}
     97         #序列化
     98         import pickle
     99         with open(self.model_file, 'wb') as f:
    100             pickle.dump(self.A_dic, f)
    101             pickle.dump(self.B_dic, f)
    102             pickle.dump(self.Pi_dic, f)
    103 
    104         return self
    105 
    106     def viterbi(self, text, states, start_p, trans_p, emit_p):
    107         V = [{}]
    108         path = {}
    109         for y in states:
    110             V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
    111             path[y] = [y]
    112         for t in range(1, len(text)):
    113             V.append({})
    114             newpath = {}
    115             
    116             #检验训练的发射概率矩阵中是否有该字
    117             neverSeen = text[t] not in emit_p['S'].keys() and 
    118                 text[t] not in emit_p['M'].keys() and 
    119                 text[t] not in emit_p['E'].keys() and 
    120                 text[t] not in emit_p['B'].keys()
    121             for y in states:
    122                 emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #设置未知字单独成词
    123                 (prob, state) = max(
    124                     [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
    125                       emitP, y0)
    126                      for y0 in states if V[t - 1][y0] > 0])
    127                 V[t][y] = prob
    128                 newpath[y] = path[state] + [y]
    129             path = newpath
    130             
    131         if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0):
    132             (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')])
    133         else:
    134             (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
    135         
    136         return (prob, path[state])
    137 
    138     def cut(self, text):
    139         import os
    140         if not self.load_para:
    141             self.try_load_model(os.path.exists(self.model_file))
    142         prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)      
    143         begin, next = 0, 0    
    144         for i, char in enumerate(text):
    145             pos = pos_list[i]
    146             if pos == 'B':
    147                 begin = i
    148             elif pos == 'E':
    149                 yield text[begin: i+1]
    150                 next = i+1
    151             elif pos == 'S':
    152                 yield char
    153                 next = i+1
    154         if next < len(text):
    155             yield text[next:]
    156 #####测试
    157 hmm=HMM()
    158 hmm.train('cutwords\trainCorpus.txt_utf8.txt')
    159 
    160 text='这是一个很棒的方案!'
    161 res=hmm.cut(text)
    162 print(text)
    163 print(str(list(res)))
  • 相关阅读:
    mongodb 配置单实例与双实例
    redis 集群 搭建
    memcached 搭建
    公网yum 源地址
    jdk 安装
    activemq 搭建--集群
    zookeeper 安装
    activemq 安装-单点
    rabbitmq 集群
    python——网络编程
  • 原文地址:https://www.cnblogs.com/mokoaxx/p/12782803.html
Copyright © 2011-2022 走看看