zoukankan      html  css  js  c++  java
  • 1216诗词七类情感分析

    七类情感分析

    情感字典标注数据集

    通过之前word2vec查找七类情感的相近词,得到一个情感词典,由于我们需要对其进行诗句评分,来判断整体的情感表现。

    故而,我们重新收集了相关的情感词典,并收集了对应的权重(相似程度值),以此来判断整首诗的情感。

    如下表所示:

     根据我们所得到的情感字典,来对唐诗48330首诗词进行情感标注

    import pandas as pd
    
    def emotion():
        data=pd.read_excel('new_emotion.xlsx')
        similar=data.get('similar')
        value = data.get('val')
        sad_list=str(similar[0]).split(',')
        fear_list=str(similar[1]).split(',')
        happy_list=str(similar[2]).split(',')
        anger_list=str(similar[3]).split(',')
        think_list=str(similar[4]).split(',')
        like_list=str(similar[5]).split(',')
        worry_list=str(similar[6]).split(',')
    
        sad_val_list = str(value[0]).split(',')
        fear_val_list = str(value[1]).split(',')
        happy_val_list = str(value[2]).split(',')
        anger_val_list = str(value[3]).split(',')
        think_val_list = str(value[4]).split(',')
        like_val_list = str(value[5]).split(',')
        worry_val_list = str(value[6]).split(',')
        return sad_list,fear_list,happy_list,anger_list,think_list,like_list,worry_list,sad_val_list,fear_val_list,happy_val_list,anger_val_list,think_val_list,like_val_list,worry_val_list
    
    def test_sentence(sentence):
        sad_list, fear_list, happy_list, anger_list, think_list, like_list, worry_list,sad_val_list,fear_val_list,happy_val_list,anger_val_list,think_val_list,like_val_list,worry_val_list=emotion()
        sad=fear=happy=anger=think=like=worry=0
        for k in sentence:
            if k in sad_list:
                sad+=float(sad_val_list[sad_list.index(k)])
            elif k in fear_list:
                fear+=float(fear_val_list[fear_list.index(k)])
            elif k in happy_list:
                happy+=float(happy_val_list[happy_list.index(k)])
            elif k in anger_list:
                anger+=float(anger_val_list[anger_list.index(k)])
            elif k in think_list:
                think+=float(think_val_list[think_list.index(k)])
            elif k in like_list:
                like+=float(like_val_list[like_list.index(k)])
            elif k in worry_list:
                worry+=float(worry_val_list[worry_list.index(k)])
        ans=max(sad,fear,happy,anger,think,like,worry)
        scord_list=[]
        scord_list.append(sad)
        scord_list.append(fear)
        scord_list.append(happy)
        scord_list.append(anger)
        scord_list.append(think)
        scord_list.append(like)
        scord_list.append(worry)
        emotion_list=['','','','','','','']
        i=0
        for i in range(len(scord_list)):
            if scord_list[i]==ans:
                #print(emotion_list[i])
                break
        if ans!=0:
            return emotion_list[i],ans
        else:
            return '',0
    
    def read():
        data=pd.read_excel('tang.xlsx')
        content_list=data.get('content')
        ans_emotion_content=[]
        print(len(content_list))
        for i in range(len(content_list)):
            print(''+str(i)+'')
            content=content_list[i].replace('\n','')
            #print(content)
            ans_content=[]
            content_l=str(content).split('')
            for k in content_l:
                kk=str(k).split('')
                for it in kk:
                    if it!='':
                        ans_content.append(it)
            ans_emotion = {}
            for sentence in ans_content:
                #print(sentence)
                emot,score=test_sentence(sentence)
                if emot!='':
                    #print(emot+str(score))
                    if emot not in ans_emotion.keys():
                        ans_emotion[emot] = score
                    else:
                        ans_emotion[emot]+=score
            #print(sorted(ans_emotion.items(), key=lambda item: item[1], reverse=True))
            if len(ans_emotion.items())==0:
                #print('整篇文章情感:无')
                ans_emotion_content.append('')
            else:
                ans_emotion=dict(sorted(ans_emotion.items(), key=lambda item: item[1], reverse=True))
                for key,value in ans_emotion.items():
                    #print('整篇文章情感:'+key)
                    ans_emotion_content.append(key)
                    break
        import xlwt
    
        xl = xlwt.Workbook()
        # 调用对象的add_sheet方法
        sheet1 = xl.add_sheet('sheet1', cell_overwrite_ok=True)
    
        sheet1.write(0, 0, "sentence")
        sheet1.write(0, 1, 'label')
        print(len(content_list))
        for i in range(0, len(content_list)):
            sheet1.write(i + 1, 0, content_list[i].replace('\n',''))
            sheet1.write(i + 1, 1, ans_emotion_content[i])
        xl.save("train.xlsx")
    
    
    
    
    if __name__ == '__main__':
        read()

    划分训练集与测试集

    将源数据进行划分,划分出对应的训练集和测试集

    首先统计下该源数据中各类情感诗词各有多少个:

    test为对应的测试集个数

    # 乐 9440       test:1000
    # 悲 13784      test:1000
    # 忧 3977       test:300
    # 思 10550      test:1000
    # 喜 6578       test:500
    # 怒 2158       test:200
    # 惧 493        test:100

    划分训练集与测试集coding:

    import pandas as pd
    
    
    # 乐 9440       test:1000
    # 悲 13784      test:1000
    # 忧 3977       test:300
    # 思 10550      test:1000
    # 喜 6578       test:500
    # 怒 2158       test:200
    # 惧 493        test:100
    
    def split_data():
        data=pd.read_excel('train.xlsx')
        sentence_list=data.get('sentence')
        label_list=data.get('label')
        emotion = ['', '', '', '', '', '', '']
        ans={}
        train_senten=[]
        train_label=[]
        test_senten=[]
        test_label=[]
        k1=k2=k3=k4=k5=k6=k7=0
        for i in range(len(sentence_list)):
            lab=label_list[i]
            sente=sentence_list[i]
            if lab in emotion:
                if lab == '':
                    if k1<=1000:
                        k1+=1
                        test_senten.append(sente)
                        test_label.append(0)
                    else:
                        train_label.append(0)
                        train_senten.append(sente)
                elif lab == '':
                    if k2<=100:
                        k2+=1
                        test_senten.append(sente)
                        test_label.append(1)
                    else:
                        train_label.append(1)
                        train_senten.append(sente)
                elif lab == '':
                    if k3<=1000:
                        k3+=1
                        test_senten.append(sente)
                        test_label.append(2)
                    else:
                        train_label.append(2)
                        train_senten.append(sente)
                elif lab == '':
                    if k4<=200:
                        k4+=1
                        test_senten.append(sente)
                        test_label.append(3)
                    else:
                        train_label.append(3)
                        train_senten.append(sente)
                elif lab == '':
                    if k5<=1000:
                        k5+=1
                        test_senten.append(sente)
                        test_label.append(4)
                    else:
                        train_label.append(4)
                        train_senten.append(sente)
                elif lab == '':
                    if k6<=500:
                        k6+=1
                        test_senten.append(sente)
                        test_label.append(5)
                    else:
                        train_label.append(5)
                        train_senten.append(sente)
                elif lab == '':
                    if k7<=300:
                        k7+=1
                        test_senten.append(sente)
                        test_label.append(6)
                    else:
                        train_label.append(6)
                        train_senten.append(sente)
        import xlwt
    
        xl = xlwt.Workbook()
        # 调用对象的add_sheet方法
        sheet1 = xl.add_sheet('sheet1', cell_overwrite_ok=True)
    
        sheet1.write(0, 0, "sentence")
        sheet1.write(0, 1, 'label')
        for i in range(0, len(test_senten)):
            sheet1.write(i + 1, 0, test_senten[i])
            sheet1.write(i + 1, 1, test_label[i])
        xl.save("data/test.xlsx")
    
    if __name__ == '__main__':
        split_data()

    结果为:

     构建词典

    主要是诗词的分割:我们采用逐个词语分割,去除中文标点符号

    #诗词分割
    def tokenlize(sentence):
        """
        进行文本分词
        :param sentence: str
        :return: [str,str,str]
        """
    
        fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                    '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '',
                    '', ]
        sentence = re.sub("|".join(fileters), "", sentence)
        punctuation_str = punctuation
        for i in punctuation_str:
            sentence = sentence.replace(i, '')
        sentence=' '.join(sentence)
        result = [i for i in sentence.split(" ") if len(i) > 0]
        return result

    建立词典

    # -*-coding:utf-8-*-
    import pickle
    
    from tqdm import tqdm
    
    from 情感分析.诗词情感分析 import dataset
    # from 情感分析.imdb_sentiment.vocab import Vocab
    from torch.utils.data import DataLoader
    
    class Vocab:
        UNK_TAG = "<UNK>"  # 表示未知字符
        PAD_TAG = "<PAD>"  # 填充符
        PAD = 0
        UNK = 1
    
        def __init__(self):
            self.dict = {  # 保存词语和对应的数字
                self.UNK_TAG: self.UNK,
                self.PAD_TAG: self.PAD
            }
            self.count = {}  # 统计词频的
    
        def fit(self, sentence):
            """
            接受句子,统计词频
            :param sentence:[str,str,str]
            :return:None
            """
            for word in sentence:
                self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频
    
        def build_vocab(self, min_count=1, max_count=None, max_features=None):
            """
            根据条件构造 词典
            :param min_count:最小词频
            :param max_count: 最大词频
            :param max_features: 最大词语数
            :return:
            """
            if min_count is not None:
                self.count = {word: count for word, count in self.count.items() if count >= min_count}
            if max_count is not None:
                self.count = {word: count for word, count in self.count.items() if count <= max_count}
            if max_features is not None:
                # [(k,v),(k,v)....] --->{k:v,k:v}
                self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])
    
            for word in self.count:
                self.dict[word] = len(self.dict)  # 每次word对应一个数字
    
            # 把dict进行翻转
            self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
    
        def transform(self, sentence, max_len=None):
            """
            把句子转化为数字序列
            :param sentence:[str,str,str]
            :return: [int,int,int]
            """
            if len(sentence) > max_len:
                sentence = sentence[:max_len]
            else:
                sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD
    
            return [self.dict.get(i, 1) for i in sentence]
    
        def inverse_transform(self, incides):
            """
            把数字序列转化为字符
            :param incides: [int,int,int]
            :return: [str,str,str]
            """
            return [self.inverse_dict.get(i, "<UNK>") for i in incides]
    
        def __len__(self):
            return len(self.dict)
    
    def collate_fn(batch):
        """
        对batch数据进行处理
        :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
        :return: 元组
        """
        reviews, labels = zip(*batch)
    
        return reviews, labels
    
    
    
    def get_dataloader(train=True):
        imdb_dataset = dataset.ImdbDataset(train)
        my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn)
        return my_dataloader
    
    
    if __name__ == '__main__':
    
        ws = Vocab()
        dl_train = get_dataloader(True)
        dl_test = get_dataloader(False)
        for reviews, label in tqdm(dl_train, total=len(dl_train)):
            for sentence in reviews:
                ws.fit(sentence)
        for reviews, label in tqdm(dl_test, total=len(dl_test)):
            for sentence in reviews:
                ws.fit(sentence)
        ws.build_vocab()
        print(len(ws))
    
        pickle.dump(ws, open("./models/vocab.pkl", "wb"))

    BiLSTM模型训练

    模型修改了对应的train_batchsize=128,num_layer=6,epoch=30,acc达到了:77.19%

    # -*-coding:utf-8-*-
    import pickle
    
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.optim import Adam
    from torch.utils.data import DataLoader
    from tqdm import tqdm
    
    from 情感分析.诗词情感分析 import dataset
    from 情感分析.诗词情感分析.vocab import Vocab
    
    train_batch_size = 128
    test_batch_size = 128
    voc_model = pickle.load(open("./models/vocab.pkl", "rb"))
    sequence_max_len = 100
    
    
    def collate_fn(batch):
        """
        对batch数据进行处理
        :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
        :return: 元组
        """
        reviews, labels = zip(*batch)
        reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews])
        labels = torch.LongTensor(labels)
        return reviews, labels
    
    
    def get_dataloader(train=True):
        imdb_dataset = dataset.ImdbDataset(train)
        batch_size = train_batch_size if train else test_batch_size
        return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    
    
    class ImdbModel(nn.Module):
        def __init__(self):
            super(ImdbModel, self).__init__()
            self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to()
            self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=6, batch_first=True, bidirectional=True,
                                dropout=0.1)
            self.fc1 = nn.Linear(64 * 2, 64)
            self.fc2 = nn.Linear(64, 7)
    
        def forward(self, input):
            """
            :param input:[batch_size,max_len]
            :return:
            """
            input_embeded = self.embedding(input)  # input embeded :[batch_size,max_len,200]
    
            output, (h_n, c_n) = self.lstm(input_embeded)  # h_n :[4,batch_size,hidden_size]
            # out :[batch_size,hidden_size*2]
            out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1)  # 拼接正向最后一个输出和反向最后一个输出
    
            # 进行全连接
            out_fc1 = self.fc1(out)
            # 进行relu
            out_fc1_relu = F.relu(out_fc1)
    
            # 全连接
            out_fc2 = self.fc2(out_fc1_relu)  # out :[batch_size,2]
            return F.log_softmax(out_fc2, dim=-1)
    
    
    def device():
        if torch.cuda.is_available():
            return torch.device('cuda')
        else:
            return torch.device('cpu')
    
    
    def train(imdb_model, epoch):
        """
    
        :param imdb_model:
        :param epoch:
        :return:
        """
        train_dataloader = get_dataloader(train=True)
    
    
        optimizer = Adam(imdb_model.parameters())
        for i in range(epoch):
            bar = tqdm(train_dataloader, total=len(train_dataloader))
            for idx, (data, target) in enumerate(bar):
                optimizer.zero_grad()
                data = data.to(device())
                target = target.to(device())
                output = imdb_model(data)
                loss = F.nll_loss(output, target)
                loss.backward()
                optimizer.step()
                bar.set_description("epcoh:{}  idx:{}   loss:{:.6f}".format(i, idx, loss.item()))
        torch.save(imdb_model, 'lstm_model.pkl')
    
    
    def test(imdb_model):
        """
        验证模型
        :param imdb_model:
        :return:
        """
        test_loss = 0
        correct = 0
        imdb_model.eval()
        test_dataloader = get_dataloader(train=False)
        with torch.no_grad():
            for data, target in tqdm(test_dataloader):
                data = data.to(device())
                target = target.to(device())
                output = imdb_model(data)
                test_loss += F.nll_loss(output, target, reduction='sum').item()
                pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
                correct += pred.eq(target.data.view_as(pred)).sum()
        test_loss /= len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))
    
    
    def xlftest():
        import numpy as np
        model = torch.load('lstm_model.pkl')
        model.to(device())
        from 情感分析.诗词情感分析.dataset import tokenlize
        #乐,悲,忧,思,喜,怒,惧
        lines=['独在异乡为异客,每逢佳节倍思亲。遥知兄弟登高处,遍插茱萸少一人。 ',
               '昔日龌龊不足夸,今朝放荡思无涯。春风得意马蹄疾,一日看尽长安花。',
               '锄禾日当午,汗滴禾下土。谁知盘中餐,粒粒皆辛苦?',
               '少小离家老大回,乡音无改鬓毛衰。儿童相见不相识,笑问客从何处来。',
               '故人具鸡黍,邀我至田家。绿树村边合,青山郭外斜。开轩面场圃,把酒话桑麻。待到重阳日,还来就菊花。',
               '怒发冲冠,凭栏处'
               ]
        for line in lines:
            print(line)
            review = tokenlize(line)
            # review=tokenlize(line)
            vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
            result = vocab_model.transform(review,sequence_max_len)
            # print(result)
            data = torch.LongTensor(result).to(device())
            data=torch.reshape(data,(1,sequence_max_len)).to(device())
            # print(data.shape)
            output = model(data)
            #print(output.data)
            pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
            #print(pred.item())
            #['悲', '惧', '乐', '怒', '思', '喜', '忧']
            if pred.item() == 0:
                print("")
            elif pred.item() == 1:
                print("")
            elif pred.item() == 2:
                print("")
            elif pred.item() == 3:
                print("")
            elif pred.item() == 4:
                print("")
            elif pred.item() == 5:
                print("")
            elif pred.item() == 6:
                print("")
    
    if __name__ == '__main__':
        # imdb_model = ImdbModel().to(device())
        # train(imdb_model,30)
        # test(imdb_model)
        xlftest()

    测试效果如下:

    都是正确的。

    不足之处在于:

    由于标注源数据是采用情绪字典,会有一部分数据标注不正确,因而训练的模型也会出现问题。但相对于人工标注已经好太多了

     

  • 相关阅读:
    搭建NLP相关的python环境
    win10 系统更新后系统第三方软件无法切换微软拼音输入中文
    NLP文本清理时常用的python小函数
    python 复制
    #论文阅读# Universial language model fine-tuning for text classification
    conda 里的 jupyter
    #论文阅读#attention is all you need
    关于多类别分类任务
    大白话AOP
    搭建JavaEE项目是遇到的几个问题
  • 原文地址:https://www.cnblogs.com/xiaofengzai/p/15700436.html
Copyright © 2011-2022 走看看