zoukankan      html  css  js  c++  java
  • 【词性标注】采用隐马尔可夫模型(使用了3-gram和Good-Turing平滑方法),准确率93%

    博客内容有空了再补充。先贴代码。

    数据地址:链接: https://pan.baidu.com/s/1-RbHi5xxBwJDG1gqAYUReQ 密码: rkup

    完整代码如下:

    import argparse
    import time
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--train', help='input a training file')
    parser.add_argument('--test', help='input a testing file')
    args = parser.parse_args()
    
    class POSTagging():
        # ======输入文件处理=============
        def __init__(self, train_path, test_path):
            # 读取传入文件内容,生成训练及测试需要的数据格式
            train_lst, test_lst = [], []
            with open(train_path, 'r', encoding='utf8') as f1, open(test_path, 'r', encoding='utf8') as f2:
                train_lst, test_lst = f1.readlines(), f2. readlines()
                
            temp_train_lst = [train_line.strip() for train_line in train_lst]
            temp_test_lst = [test_line.strip() for test_line in test_lst]
            temp_train_sent, temp_test_sent = [], []
            
            self.train_sent_lst, self.test_sent_lst = [], []
            self.tags_cnt, self.words_cnt = 0, 0
            self.tag2num, self.num2tag = {}, []
            self.word2num = {}
    
            for i in range(len(temp_train_lst)):
                line = temp_train_lst[i]
                if line.split('/')[0] == '###':
                    self.train_sent_lst.append(temp_train_sent)
                    temp_train_sent = []
                elif line != '':
                    temp_train_sent.append(line)
            if temp_train_sent != []:
                self.train_sent_lst.append(temp_train_sent)
            
            for i in range(len(temp_test_lst)):
                line = temp_test_lst[i]
                if line.split('/')[0] == '###':
                    self.test_sent_lst.append(temp_test_sent)
                    temp_test_sent = []
                elif line != '':
                    temp_test_sent.append(line)
            if temp_test_sent != []:
                self.test_sent_lst.append(temp_test_sent)
    
        # =========计算概率矩阵==========
        def train(self):
            emission_cnt = {}
            trigram_cnt = {}
            self.all_tags = set()
            self.all_words = set()
            self.all_words.add('UNK')
            
            # 统计词频
            for train_sent in self.train_sent_lst:
                tag_sent = []
                for word_tag in train_sent:
                    wrd = word_tag.split('/')[0]
                    tag = word_tag.split('/')[1]
                    tag_sent.append(tag)
                    self.all_words.add(wrd)
                    self.all_tags.add(tag)
                    if (wrd, tag) not in emission_cnt:
                        emission_cnt[(wrd, tag)] = 0
                    emission_cnt[(wrd, tag)] += 1
                extend_tag_sent = 2 * ['*']
                extend_tag_sent.extend(tag_sent)
                extend_tag_sent.append('STOP')
                for i in range(len(extend_tag_sent) - 2):
                    if tuple(extend_tag_sent[i:i+3]) not in trigram_cnt:
                        trigram_cnt[tuple(extend_tag_sent[i:i+3])] = 0
                    trigram_cnt[tuple(extend_tag_sent[i:i+3])] += 1
            
            # 对词语和词性做映射
            for tag in self.all_tags:
                self.tag2num[tag] = self.tags_cnt
                self.num2tag.append(tag)
                self.tags_cnt += 1
            for wrd in self.all_words:
                self.word2num[wrd] = self.words_cnt
                self.words_cnt += 1
                
            print(self.tags_cnt, ' ', self.words_cnt)
            
            # 计算发射矩阵和转移矩阵
            nt = self.tags_cnt
            nw = self.words_cnt
            self.emission_prob = [None for i in range(nt)]
            self.transition_prob = [[None for i in range(nt+1)] for j in range(nt+1)]
            # 发射矩阵
            for i in range(nt):
                tag = self.num2tag[i]
                counts = [0] * (nw+1)
                for wrd in self.all_words:
                    if (wrd, tag) not in emission_cnt:
                        emission_cnt[(wrd, tag)] = 0
                    counts[self.word2num[wrd]] = emission_cnt[(wrd, tag)]
                self.emission_prob[i] = self.good_turing(counts)
            # 转移矩阵(u, v, w)或者(u, v, 'STOP')
            for i in range(nt):
                u = self.num2tag[i]
                for j in range(nt):
                    v = self.num2tag[j]
                    counts = [0] * (nt+1)
                    for w in self.all_tags:
                        if (u, v, w) not in trigram_cnt:
                            trigram_cnt[(u, v, w)] = 0
                        counts[self.tag2num[w]] = trigram_cnt[(u, v, w)]
                    if (u, v, 'STOP') not in trigram_cnt:
                        trigram_cnt[(u, v, 'STOP')] = 0
                    counts[nt] = trigram_cnt[(u, v, 'STOP')]
                    self.transition_prob[i][j] = self.good_turing(counts)
            # 转移矩阵(*, v, w)
            for j in range(nt):
                v = self.num2tag[j]
                counts = [0] * (nt+1)
                for w in self.all_tags:
                    if ('*', v, w) not in trigram_cnt:
                        trigram_cnt[('*', v, w)] = 0
                    counts[self.tag2num[w]] = trigram_cnt[('*', v, w)]
                if ('*', v, 'STOP') not in trigram_cnt:
                    trigram_cnt[('*', v, 'STOP')] = 0
                counts[nt] = trigram_cnt[('*', v, 'STOP')]
                self.transition_prob[nt][j] = self.good_turing(counts)
            # 转移矩阵(*, *, w)
            counts = [0] * nt
            for w in self.all_tags:
                if ('*', '*', w) not in trigram_cnt:
                    trigram_cnt[('*', '*', w)] = 0
                counts[self.tag2num[w]] = trigram_cnt[('*', '*', w)]
            self.transition_prob[nt][nt] = self.good_turing(counts)
            
        # ========结果预测及保存===========
        def predict(self):
            word_sequence = []
            novel_sequence = []
            predict_result = []
            true_result = []
            
            print(len(self.test_sent_lst), " sentences total.")
            for i in range(len(self.test_sent_lst)):
                if i % 10 == 0:
                    print("process ", i, " sentence")
                test_sent = self.test_sent_lst[i]
                temp_sent = [line.strip().split('/')[0] for line in test_sent]
                labels = [line.strip().split('/')[1] for line in test_sent]
                
                word_sequence.extend(temp_sent)
                word_sequence.append("###")
                
                temp_sent = [word if word in self.all_words else 'UNK' for word in temp_sent]
                temp_result = self.viterbi(temp_sent)
                
                predict_result.extend(temp_result)
                predict_result.append("###")
                
                true_result.extend(labels)
                true_result.append("###")
                
                novel_sequence.extend(temp_sent)
                novel_sequence.append("###")
            
            self.evaluation(predict_result, true_result, novel_sequence, word_sequence)
            self.save_result(predict_result, word_sequence)
            #for pred, label in zip(predict_result, true_result):
                #print(pred, label)
                
        def evaluation(self, predict_result, true_result, novel_sequence, word_sequence):
            cnt_known = 0
            cnt_novel = 0
            cnt_known_right = 0
            cnt_novel_right = 0
            for i in range(len(predict_result)):
                if true_result[i] == '###':
                    continue
                if novel_sequence[i] == word_sequence[i]:
                    if predict_result[i] == true_result[i]:
                        cnt_known_right += 1
                    cnt_known += 1
                else:
                    if predict_result[i] == true_result[i]:
                        cnt_novel_right += 1
                    cnt_novel += 1
            
            if cnt_known+cnt_novel != 0:
                print("accuracy: ", round((cnt_known_right+cnt_novel_right)*100/(cnt_known+cnt_novel), 2), "%")
            if cnt_known != 0:
                print("known word accuracy: ", round((cnt_known_right)*100/(cnt_known), 2), "%")
            if cnt_novel != 0:
                print("novel word accuracy: ", round((cnt_novel_right)*100/(cnt_novel), 2), "%")
            
        def save_result(self, predict_result, word_sequence):
            with open("test-output", "w", encoding="utf8") as f:
                for i in range(len(word_sequence)):
                    f.write(word_sequence[i]+'/'+predict_result[i]+'
    ')
            print("result saved.")
            
        # =========viterbi算法============
        def viterbi(self, sent):
            n = len(sent)
            nt = self.tags_cnt
            y = [None] * n
            path = [[[0]*nt for i in range(nt)] for j in range(n-1)]
            val = [[[0]*nt for i in range(nt)] for j in range(n-1)]
            
            # 如果句子只有一个单词,则单独处理
            if (n == 1):
                max_val = -100000
                for v in range(nt):
                    tmp = self.transition_prob[nt][nt][v] * self.emission_prob[v][self.word2num[sent[0]]] * self.transition_prob[nt][v][nt]
                    if tmp > max_val:
                        max_val = tmp
                        y[0] = v
                return [self.num2tag[y[0]]]
            
            # 句首
            for u in range(nt):
                for v in range(nt):
                    val[0][u][v] = self.transition_prob[nt][nt][u] * self.emission_prob[u][self.word2num[sent[0]]] * 
                        self.transition_prob[nt][u][v] * self.emission_prob[v][self.word2num[sent[1]]]
                    path[0][u][v] = -1
                    
            # viterbi动态规划
            for k in range(1, n-1):
                for u in range(nt):
                    for v in range(nt):
                        max_val = -100000
                        best_tag = -1
                        for w in range(nt):
                            tmp = val[k-1][w][u] * self.transition_prob[w][u][v] * self.emission_prob[v][self.word2num[sent[k+1]]]
                            if tmp > max_val:
                                max_val = tmp
                                best_tag = w
                        val[k][u][v] = max_val
                        path[k][u][v] = best_tag
                        
            # 句尾
            max_val = -100000
            for u in range(nt):
                for v in range(nt):
                    tmp = val[n-2][u][v] * self.transition_prob[u][v][nt]
                    if tmp > max_val:
                        max_val = tmp
                        y[-1] = v; y[-2] = u
                        
            # 找到最佳标注
            for k in range(n-3, -1, -1):
                y[k] = path[k+1][y[k+1]][y[k+2]]
                
            return [self.num2tag[t] for t in y]
          
        # ==========平滑方法===========
        def good_turing(self, counts):
            
            N = sum(counts)  # 总的出现次数
            prob = [0] * len(counts)
            
            if N == 0:
                return prob
                
            Nr = [0] * (max(counts) + 1) # 出现r次的词个数
            for r in counts:
                Nr[r] += 1
                
            max_smooth = min(len(Nr)-1, 8)  # 使用good-turing方法进行平滑
            for r in range(max_smooth):
                if Nr[r] != 0 and Nr[r+1] != 0:
                    Nr[r] = (r+1) * Nr[r+1] / Nr[r]
                else:
                    Nr[r] = r
            for r in range(max_smooth, len(Nr)):
                Nr[r] = r
                
            for i in range(len(counts)):
                prob[i] = Nr[counts[i]]
            total = sum(prob)
            return [p/total for p in prob]  # 归一化输出
            
    if __name__ == "__main__":
        start_time = time.time()
        
        pos_tagging = POSTagging(args.train, args.test)
        pos_tagging.train()
        pos_tagging.predict()
        
        end_time = time.time()
        print("time cost: ", int(end_time - start_time), " seconds")
    
  • 相关阅读:
    蓝桥杯历届试题 打印十字图 文字图形
    Cuckoo Hashing
    2006 飞行员配对(二分图最大匹配)
    Bad Hair Day(求数组中元素和它后面离它最近元素之间的元素个数)
    2019CCPC江西省赛
    字典树系统学习
    ac自动机学习
    项目管理(把与某点相邻边分为两类 是复杂度降为(n^(3/2))
    Ultra-QuickSort(离散化)
    Chika and Friendly Pairs(莫队+树状数组+离散化+预处理上下界)
  • 原文地址:https://www.cnblogs.com/yanqiang/p/14067252.html
Copyright © 2011-2022 走看看