zoukankan      html  css  js  c++  java
  • Bilstm中文微博多情感分析

    Bilstm中文微博多情感分析

    数据

    我的数据是来自github的一个项目:ChineseNlpCorpus 里面收集了蛮多用于自然语言处理的中文数据集/语料。

    下载地址: 百度网盘
    数据概览: 36 万多条,带情感标注 新浪微博,包含 4 种情感,其中喜悦约 20 万条,愤怒、厌恶、低落各约 5 万条
    数据来源: 新浪微博
    原数据集: 微博情感分析数据集,网上搜集,具体作者、来源不详

    预处理

    划分训练集和测试集

    将表中的各类情感的数据各抽取前10000条做测试集,剩余的用作训练集

    import pandas as pd
    import openpyxl
    from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
    
    
    def test():
    
        file='simplifyweibo_4_moods.csv'
        data=pd.read_csv(file)
        label=data.get('label')
        review = data.get('review')
        train_review=[]
        train_label=[]
        test_review=[]
        test_label=[]
        n1=n2=n3=n4=0
        for i in range(len(review)):
            lab=int(label[i])
            line=str(review[i])
            line=ILLEGAL_CHARACTERS_RE.sub(r'', line)
            if int(lab)==0:
                if n1<10000:
                    n1+=1
                    test_label.append(lab)
                    test_review.append(line)
                else:
                    train_label.append(lab)
                    train_review.append(line)
            elif int(lab)==1:
                if n2<10000:
                    n2+=1
                    test_label.append(lab)
                    test_review.append(line)
                else:
                    train_label.append(lab)
                    train_review.append(line)
            elif int(lab)==2:
                if n3<10000:
                    n3+=1
                    test_label.append(lab)
                    test_review.append(line)
                else:
                    train_label.append(lab)
                    train_review.append(line)
            elif int(lab)==3:
                if n4<10000:
                    n4+=1
                    test_label.append(lab)
                    test_review.append(line)
                else:
                    train_label.append(lab)
                    train_review.append(line)
        import openpyxl
        import xlsxwriter
    
        xl = openpyxl.Workbook()
        # 调用对象的add_sheet方法
        sheet1 = xl.create_sheet(index=0)
        sheet1.cell(1, 1, "label")
        sheet1.cell(1, 2, "review")
    
        for i in range(0, len(train_review)):
            sheet1.cell(i + 2, 1, train_label[i])
            sheet1.cell(i + 2, 2, train_review[i])
        xl.save("train.xlsx")
    
    
    if __name__ == '__main__':
        test()

    得到对应的文件

     

    生成字典

    数据中的一些标点符号、特殊符号、英文字母、数字等对于我们的实验都是没有用处的,所以我们需要将他们过滤掉。

    去除停用词

    def tokenlize(sentence):
        """
        进行文本分词
        :param sentence: str
        :return: [str,str,str]
        """
    
        fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                    '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '',
                    '', ]
        sentence = re.sub("|".join(fileters), "", sentence)
        sentence=jieba.cut(sentence,cut_all=False)
        sentence=' '.join(sentence)
        result = [i for i in sentence.split(" ") if len(i) > 0]
        result=movestopwords(result)
        return result
    
    def stopwordslist(filepath):
        stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
        return stopwords
    
    
    # 对句子去除停用词
    def movestopwords(sentence):
        stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
        outstr = []
        for word in sentence:
            if word not in stopwords:
                if word != '\t' and '\n':
                    outstr.append(word)
                    # outstr += " "
        return outstr

    设计字典类

    """
    文本序列化
    """
    
    
    class Vocab:
        UNK_TAG = "<UNK>"  # 表示未知字符
        PAD_TAG = "<PAD>"  # 填充符
        PAD = 0
        UNK = 1
    
        def __init__(self):
            self.dict = {  # 保存词语和对应的数字
                self.UNK_TAG: self.UNK,
                self.PAD_TAG: self.PAD
            }
            self.count = {}  # 统计词频的
    
        def fit(self, sentence):
            """
            接受句子,统计词频
            :param sentence:[str,str,str]
            :return:None
            """
            for word in sentence:
                self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频
    
        def build_vocab(self, min_count=1, max_count=None, max_features=None):
            """
            根据条件构造 词典
            :param min_count:最小词频
            :param max_count: 最大词频
            :param max_features: 最大词语数
            :return:
            """
            if min_count is not None:
                self.count = {word: count for word, count in self.count.items() if count >= min_count}
            if max_count is not None:
                self.count = {word: count for word, count in self.count.items() if count <= max_count}
            if max_features is not None:
                # [(k,v),(k,v)....] --->{k:v,k:v}
                self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])
    
            for word in self.count:
                self.dict[word] = len(self.dict)  # 每次word对应一个数字
    
            # 把dict进行翻转
            self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
    
        def transform(self, sentence, max_len=None):
            """
            把句子转化为数字序列
            :param sentence:[str,str,str]
            :return: [int,int,int]
            """
            if len(sentence) > max_len:
                sentence = sentence[:max_len]
            else:
                sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD
    
            return [self.dict.get(i, 1) for i in sentence]
    
        def inverse_transform(self, incides):
            """
            把数字序列转化为字符
            :param incides: [int,int,int]
            :return: [str,str,str]
            """
            return [self.inverse_dict.get(i, "<UNK>") for i in incides]
    
        def __len__(self):
            return len(self.dict)
    
    # # 以下是调试代码
    # if __name__ == '__main__':
    #     sentences = [["今天", "天气", "很", "好"],
    #                  ["今天", "去", "吃", "什么"]]
    #     ws = Vocab()
    #     for sentence in sentences:
    #         # 统计词频
    #         ws.fit(sentence)
    #     # 构造词典
    #     ws.build_vocab(min_count=1)
    #     print(ws.dict)
    #     # 把句子转换成数字序列
    #     ret = ws.transform(["好", "好", "好", "好", "好", "好", "好", "热", "呀"], max_len=13)
    #     print(ret)
    #     # 把数字序列转换成句子
    #     ret = ws.inverse_transform(ret)
    #     print(ret)
    #     pass

    dataset

    # -*-coding:utf-8-*-
    import os
    import pickle
    import re
    import zipfile
    import jieba
    
    from torch.utils.data import Dataset, DataLoader
    from tqdm import tqdm
    import pandas as pd
    
    class ImdbDataset(Dataset):
        def __init__(self, train=True):
            if train == True:
                url = 'data/train.xlsx'
            else:
                url = "data/test.xlsx"
            data = pd.read_excel(url)
            sentence = data.get('review')
            label = data.get('label')
            self.sentence_list=sentence
            self.label_list=label
    
    
    
        def __getitem__(self, idx):
            line_text=self.sentence_list[idx]
            # 从txt获取评论并分词
            review = tokenlize(str(line_text))
            # 获取评论对应的label
            label = int(self.label_list[idx])
            return review, label
    
        def __len__(self):
            return len(self.sentence_list)
    
    
    def tokenlize(sentence):
        """
        进行文本分词
        :param sentence: str
        :return: [str,str,str]
        """
    
        fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
                    '\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '',
                    '', ]
        sentence = re.sub("|".join(fileters), "", sentence)
        sentence=jieba.cut(sentence,cut_all=False)
        sentence=' '.join(sentence)
        result = [i for i in sentence.split(" ") if len(i) > 0]
        result=movestopwords(result)
        return result
    
    def stopwordslist(filepath):
        stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
        return stopwords
    
    
    # 对句子去除停用词
    def movestopwords(sentence):
        stopwords = stopwordslist('data/stopwords.txt')  # 这里加载停用词的路径
        outstr = []
        for word in sentence:
            if word not in stopwords:
                if word != '\t' and '\n':
                    outstr.append(word)
                    # outstr += " "
        return outstr
    
    
    # 以下为调试代码
    def collate_fn(batch):
        """
        对batch数据进行处理
        :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
        :return: 元组
        """
        reviews, labels = zip(*batch)
    
        return reviews, labels
    
    
    
    if __name__ == "__main__":
        from 情感分析.imdb_sentiment.vocab import Vocab
        imdb_dataset = ImdbDataset(True)
        my_dataloader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
        for review,label in my_dataloader:
            vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
            print(review[0])
            result = vocab_model.transform(review[0], 30)
            print(result)
            break

    构建字典

    # -*-coding:utf-8-*-
    import pickle
    
    from tqdm import tqdm
    
    from 情感分析.weibo_many_emotion import dataset
    # from 情感分析.imdb_sentiment.vocab import Vocab
    from torch.utils.data import DataLoader
    
    class Vocab:
        UNK_TAG = "<UNK>"  # 表示未知字符
        PAD_TAG = "<PAD>"  # 填充符
        PAD = 0
        UNK = 1
    
        def __init__(self):
            self.dict = {  # 保存词语和对应的数字
                self.UNK_TAG: self.UNK,
                self.PAD_TAG: self.PAD
            }
            self.count = {}  # 统计词频的
    
        def fit(self, sentence):
            """
            接受句子,统计词频
            :param sentence:[str,str,str]
            :return:None
            """
            for word in sentence:
                self.count[word] = self.count.get(word, 0) + 1  # 所有的句子fit之后,self.count就有了所有词语的词频
    
        def build_vocab(self, min_count=1, max_count=None, max_features=None):
            """
            根据条件构造 词典
            :param min_count:最小词频
            :param max_count: 最大词频
            :param max_features: 最大词语数
            :return:
            """
            if min_count is not None:
                self.count = {word: count for word, count in self.count.items() if count >= min_count}
            if max_count is not None:
                self.count = {word: count for word, count in self.count.items() if count <= max_count}
            if max_features is not None:
                # [(k,v),(k,v)....] --->{k:v,k:v}
                self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])
    
            for word in self.count:
                self.dict[word] = len(self.dict)  # 每次word对应一个数字
    
            # 把dict进行翻转
            self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
    
        def transform(self, sentence, max_len=None):
            """
            把句子转化为数字序列
            :param sentence:[str,str,str]
            :return: [int,int,int]
            """
            if len(sentence) > max_len:
                sentence = sentence[:max_len]
            else:
                sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence))  # 填充PAD
    
            return [self.dict.get(i, 1) for i in sentence]
    
        def inverse_transform(self, incides):
            """
            把数字序列转化为字符
            :param incides: [int,int,int]
            :return: [str,str,str]
            """
            return [self.inverse_dict.get(i, "<UNK>") for i in incides]
    
        def __len__(self):
            return len(self.dict)
    
    def collate_fn(batch):
        """
        对batch数据进行处理
        :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
        :return: 元组
        """
        reviews, labels = zip(*batch)
    
        return reviews, labels
    
    
    
    def get_dataloader(train=True):
        imdb_dataset = dataset.ImdbDataset(train)
        my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn)
        return my_dataloader
    
    
    if __name__ == '__main__':
    
        ws = Vocab()
        dl_train = get_dataloader(True)
        dl_test = get_dataloader(False)
        for reviews, label in tqdm(dl_train, total=len(dl_train)):
            for sentence in reviews:
                ws.fit(sentence)
        for reviews, label in tqdm(dl_test, total=len(dl_test)):
            for sentence in reviews:
                ws.fit(sentence)
        ws.build_vocab()
        print(len(ws))
    
        pickle.dump(ws, open("./models/vocab.pkl", "wb"))

    模型训练

    # -*-coding:utf-8-*-
    import pickle
    
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.optim import Adam
    from torch.utils.data import DataLoader
    from tqdm import tqdm
    
    from 情感分析.weibo_many_emotion import dataset
    from 情感分析.中文情感分类.vocab import Vocab
    
    train_batch_size = 512
    test_batch_size = 128
    voc_model = pickle.load(open("./models/vocab.pkl", "rb"))
    sequence_max_len = 100
    
    
    def collate_fn(batch):
        """
        对batch数据进行处理
        :param batch: [一个getitem的结果,getitem的结果,getitem的结果]
        :return: 元组
        """
        reviews, labels = zip(*batch)
        reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews])
        labels = torch.LongTensor(labels)
        return reviews, labels
    
    
    def get_dataloader(train=True):
        imdb_dataset = dataset.ImdbDataset(train)
        batch_size = train_batch_size if train else test_batch_size
        return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
    
    
    class ImdbModel(nn.Module):
        def __init__(self):
            super(ImdbModel, self).__init__()
            self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to()
            self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True,
                                dropout=0.1)
            self.fc1 = nn.Linear(64 * 2, 64)
            self.fc2 = nn.Linear(64, 4)
    
        def forward(self, input):
            """
            :param input:[batch_size,max_len]
            :return:
            """
            input_embeded = self.embedding(input)  # input embeded :[batch_size,max_len,200]
    
            output, (h_n, c_n) = self.lstm(input_embeded)  # h_n :[4,batch_size,hidden_size]
            # out :[batch_size,hidden_size*2]
            out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1)  # 拼接正向最后一个输出和反向最后一个输出
    
            # 进行全连接
            out_fc1 = self.fc1(out)
            # 进行relu
            out_fc1_relu = F.relu(out_fc1)
    
            # 全连接
            out_fc2 = self.fc2(out_fc1_relu)  # out :[batch_size,2]
            return F.log_softmax(out_fc2, dim=-1)
    
    
    def device():
        if torch.cuda.is_available():
            return torch.device('cuda')
        else:
            return torch.device('cpu')
    
    
    def train(imdb_model, epoch):
        """
    
        :param imdb_model:
        :param epoch:
        :return:
        """
        train_dataloader = get_dataloader(train=True)
    
    
        optimizer = Adam(imdb_model.parameters())
        for i in range(epoch):
            bar = tqdm(train_dataloader, total=len(train_dataloader))
            for idx, (data, target) in enumerate(bar):
                optimizer.zero_grad()
                data = data.to(device())
                target = target.to(device())
                output = imdb_model(data)
                loss = F.nll_loss(output, target)
                loss.backward()
                optimizer.step()
                bar.set_description("epcoh:{}  idx:{}   loss:{:.6f}".format(i, idx, loss.item()))
        torch.save(imdb_model, 'lstm_model.pkl')
    
    
    def test(imdb_model):
        """
        验证模型
        :param imdb_model:
        :return:
        """
        test_loss = 0
        correct = 0
        imdb_model.eval()
        test_dataloader = get_dataloader(train=False)
        with torch.no_grad():
            for data, target in tqdm(test_dataloader):
                data = data.to(device())
                target = target.to(device())
                output = imdb_model(data)
                test_loss += F.nll_loss(output, target, reduction='sum').item()
                pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
                correct += pred.eq(target.data.view_as(pred)).sum()
        test_loss /= len(test_dataloader.dataset)
        print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
            test_loss, correct, len(test_dataloader.dataset),
            100. * correct / len(test_dataloader.dataset)))
    
    
    def xlftest():
        import numpy as np
        model = torch.load('lstm_model.pkl')
        model.to(device())
        from 情感分析.weibo_many_emotion.dataset import tokenlize
        lines=['哈哈哈开心','真是无语,你们怎么搞的','小姐姐,祝你生日快乐','你他妈的有病']
        for line in lines:
            print(line)
            review = tokenlize(line)
            # review=tokenlize(line)
            vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
            result = vocab_model.transform(review,sequence_max_len)
            # print(result)
            data = torch.LongTensor(result).to(device())
            data=torch.reshape(data,(1,sequence_max_len)).to(device())
            # print(data.shape)
            output = model(data)
            print(output.data)
            pred = output.data.max(1, keepdim=True)[1]  # 获取最大值的位置,[batch_size,1]
            print(pred.item())
            if pred.item() == 0:
                print("喜悦")
            elif pred.item() == 1:
                print("愤怒")
            elif pred.item() == 2:
                print("厌恶")
            elif pred.item() == 3:
                print("低落")
    
    if __name__ == '__main__':
        # imdb_model = ImdbModel().to(device())
        # train(imdb_model,20)
        # test(imdb_model)
        xlftest()

    测试结果:

    对四种分类的准确度只有40左右,一部分原因是数据集不规范,还需要进行调参优化

     

  • 相关阅读:
    array note
    前端开发规范
    java集合分类
    react native环境
    gulp
    关于如何在海量手机号中刷选出想要的手机号
    生成用户头像插件
    下拉加载
    阿里云人脸人体识别调试心得与备忘
    Sublime text3使用时有卡顿现象
  • 原文地址:https://www.cnblogs.com/xiaofengzai/p/15695983.html
Copyright © 2011-2022 走看看