zoukankan      html  css  js  c++  java
  • pytorch seq2seq闲聊机器人

    cut_sentence.py

    """
    实现句子的分词
    注意点:
    1. 实现单个字分词
    
    2. 实现按照词语分词
    2.1 加载词典
    
    3. 使用停用词
    """
    
    import string
    import jieba
    import jieba.posseg as psg
    import logging
    
    stopwords_path = "../corpus/stopwords.txt"
    
    stopwords = [i.strip() for i in open(stopwords_path,encoding="utf-8").readlines()]
    
    #关闭jieba日志
    jieba.setLogLevel(logging.INFO)
    
    #加载词典
    jieba.load_userdict("../corpus/keywords.txt")
    
    
    continue_words = string.ascii_lowercase
    
    def _cut_sentence_by_word(sentence):
        """
        按照单个字进行分词,eg:python 可 以 做 人 工 智 能 么 ? jave
        :param sentence:str
        :return: [str,str,str]
        """
        temp = ""
        result = []
        for word in sentence:
            if word in continue_words:
                temp += word
            else:
                if len(temp)>0:
                    result.append(temp)
                    temp = ""
                result.append(word)
    
        if len(temp)>0:
            result.append(temp)
        return result
    
    
    def _cut_sentence(sentence,use_stopwords,use_seg):
        """
        按照词语进行分词
        :param sentence:str
        :return: 【str,str,str】
        """
        if not use_seg:
            result = jieba.lcut(sentence)
        else:
            result = [(i.word,i.flag) for i in psg.cut(sentence)]
        if use_stopwords:
            if not use_seg:
                result = [i for i in result if i not in stopwords]
            else:
                result = [i for i in result if i[0] not in stopwords]
        return result
    
    def cut(sentence,by_word=False,use_stopwords=False,use_seg=False):
        """
        封装上述的方法
        :param sentence:str
        :param by_word: bool,是否按照单个字分词
        :param use_stopwords: 是否使用停用词
        :param use_seg: 是否返回词性
        :return: [(str,seg),str]
        """
        sentence = sentence.lower()
        if by_word:
            return _cut_sentence_by_word(sentence)
        else:
            return _cut_sentence(sentence,use_stopwords,use_seg)
    

    word_sequence.py

    """
    文本序列化
    """
    
    class WordSequence:
        UNK_TAG = "<UNK>"  #表示未知字符
        PAD_TAG = "<PAD>"  #填充符
        SOS_TAG = "<SOS>"
        EOS_TAG = "<EOS>"
        PAD = 0
        UNK = 1
        SOS = 2
        EOS = 3
    
        def __init__(self):
            self.dict = {   #保存词语和对应的数字
                self.UNK_TAG:self.UNK,
                self.PAD_TAG:self.PAD,
                self.SOS_TAG:self.SOS,
                self.EOS_TAG:self.EOS
            }
            self.count = {}  #统计词频的
    
    
        def fit(self,sentence):
            """
            接受句子,统计词频
            :param sentence:[str,str,str]
            :return:None
            """
            for word in sentence:
                self.count[word] = self.count.get(word,0)  + 1  #所有的句子fit之后,self.count就有了所有词语的词频
    
        def build_vocab(self,min_count=5,max_count=None,max_features=None):
            """
            根据条件构造 词典
            :param min_count:最小词频
            :param max_count: 最大词频
            :param max_features: 最大词语数
            :return:
            """
            if min_count is not None:
                self.count = {word:count for word,count in self.count.items() if count >= min_count}
            if max_count is not None:
                self.count = {word:count for word,count in self.count.items() if count <= max_count}
            if max_features is not None:
                #[(k,v),(k,v)....] --->{k:v,k:v}
                self.count = dict(sorted(self.count.items(),lambda x:x[-1],reverse=True)[:max_features])
    
            for word in self.count:
                self.dict[word]  = len(self.dict)  #每次word对应一个数字
    
            #把dict进行翻转
            self.inverse_dict = dict(zip(self.dict.values(),self.dict.keys()))
    
        def transform(self,sentence,max_len=None,add_eos=False):
            """
            把句子转化为数字序列
            :param sentence:[str,str,str]
            :return: [int,int,int]
            """
            if add_eos and max_len is not None:
                max_len = max_len-1
    
            if len(sentence) > max_len:
                sentence = sentence[:max_len]
            else:
                sentence = sentence + [self.PAD_TAG] *(max_len- len(sentence))  #填充PAD
    
            if add_eos:
                if self.PAD_TAG in sentence:
                    index = sentence.index(self.PAD_TAG)
                    sentence.insert(index,self.EOS_TAG)
                else:
                    sentence += [self.EOS_TAG]
    
            return [self.dict.get(i,1) for i in sentence]
    
        def inverse_transform(self,incides):
            """
            把数字序列转化为字符
            :param incides: [int,int,int]
            :return: [str,str,str]
            """
            result = []
            for i in incides:
                temp = self.inverse_dict.get(i, "<UNK>")
                if temp != self.EOS_TAG:
                    result.append(temp)
                else:
                    break
            return "".join(result)
    
        def __len__(self):
            return len(self.dict)
    
    if __name__ == '__main__':
        sentences  = [["今天","天气","很","好"],
                      ["今天","去","吃","什么"]]
        ws = WordSequence()
        for sentence in sentences:
            ws.fit(sentence)
        ws.build_vocab(min_count=1)
        print(ws.dict)
        ret = ws.transform(["好","好","好","好","好","好","好","热","呀"],max_len=3)
        print(ret)
        ret = ws.inverse_transform(ret)
        print(ret)
        pass
    

      

    dataset.py

    """
    准备数据集
    """
    import random
    from tqdm import tqdm
    import config
    import torch
    from torch.utils.data import DataLoader,Dataset
    
    
    
    #1. 进行数据集的切分
    def chatbot_data_split():
        input = open("../corpus/chatbot/input.txt",encoding="utf-8").readlines()
        target = open("../corpus/chatbot/target.txt",encoding="utf-8").readlines()
        f_train_input = open("../corpus/chatbot/train_input.txt","a",encoding="utf-8")
        f_train_target = open("../corpus/chatbot/train_target.txt","a",encoding="utf-8")
        f_test_input = open("../corpus/chatbot/test_input.txt","a",encoding="utf-8")
        f_test_target = open("../corpus/chatbot/test_target.txt","a",encoding="utf-8")
        for input,target in tqdm(zip(input,target),total=len(input)):
            if random.random()>0.8:
                #放入test
                f_test_input.write(input)
                f_test_target.write(target)
            else:
                f_train_input.write(input)
                f_train_target.write(target)
        f_train_input.close()
        f_train_target.close()
        f_test_input.close()
        f_test_target.close()
    
    
    
    #2. 准备dataset
    
    class ChatDataset(Dataset):
        def __init__(self,train=True):
            input_path = "../corpus/chatbot/train_input.txt" if train else "../corpus/chatbot/test_input.txt"
            target_path = "../corpus/chatbot/train_target.txt" if train else  "../corpus/chatbot/test_target.txt"
            self.input_data = open(input_path,encoding="utf-8").readlines()
            self.target_data = open(target_path,encoding="utf-8").readlines()
            assert len(self.input_data) == len(self.target_data),"input target长度不一致!!!"
    
        def __getitem__(self, idx):
            input = self.input_data[idx].strip().split()
            target = self.target_data[idx].strip().split()
            #获取真实长度
            input_len = len(input) if len(input)<config.chatbot_input_max_len else config.chatbot_input_max_len
            target_len = len(target) if len(target)<config.chatbot_target_max_len else config.chatbot_target_max_len
    
            input = config.input_ws.transform(input,max_len=config.chatbot_input_max_len)
            target = config.target_ws.transform(target,max_len=config.chatbot_target_max_len,add_eos=True)
            return input,target,input_len,target_len
    
    
        def __len__(self):
            return len(self.input_data)
    
    
    
    # 3. 准备dataloader
    def collate_fn(batch):
        """
        :param batch:【(input,target,input_len,target_len),(),(一个getitem的结果)】
        :return:
        """
        #1. 对batch按照input的长度进行排序
        batch = sorted(batch,key=lambda x:x[-2],reverse=True)
        #2. 进行batch操作
        input, target, input_len, target_len = zip(*batch)
        #3. 把输入处理成LongTensor
        input = torch.LongTensor(input)
        target = torch.LongTensor(target)
        input_len = torch.LongTensor(input_len)
        target_len = torch.LongTensor(target_len)
        return input, target, input_len, target_len
    
    
    def get_dataloader(train=True):
        batch_size = config.chatbot_train_batch_size if train else config.chatbot_test_batch_size
        return DataLoader(ChatDataset(train),batch_size=batch_size,collate_fn=collate_fn,shuffle=True)
    
    if __name__ == '__main__':
        loader = get_dataloader()
        for idx,(input, target, input_len, target_len) in enumerate(loader):
            print(idx)
            print(input)
            print(target)
            print(input_len)
            print(target_len)
            break
    

    config.py

    """
    项目配置
    """
    import pickle
    import torch
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # device = ("cpu")
    
    ################# classify 相关的配置 ###############
    predict_ratio = 0.98  #预测可能性的阈值
    
    ################# chatbot相关的配置 #################
    chatbot_train_batch_size = 400
    chatbot_test_batch_size = 500
    
    input_ws = pickle.load(open("../chatbot/models/ws_input.pkl","rb"))
    target_ws = pickle.load(open("../chatbot/models/ws_target.pkl","rb"))
    chatbot_input_max_len = 20
    chatbot_target_max_len = 30
    
    chatbot_encoder_embedding_dim = 300
    chatbot_encoder_hidden_size = 128
    chatbot_encoder_number_layer = 2
    chatbot_encoder_bidirectional = True
    chatbot_encoder_dropout = 0.3
    
    chatbot_decoder_embedding_dim = 300
    chatbot_decoder_hidden_size = 128*2
    chatbot_decoder_number_layer = 1
    chatbot_decoder_dropout = 0
    

      

      

    encoder.py

    """
    进行编码
    """
    
    import torch.nn as nn
    from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
    import config
    import torch
    
    
    class Encoder(nn.Module):
        def __init__(self):
            super(Encoder,self).__init__()
            self.embedding = nn.Embedding(num_embeddings=len(config.input_ws),
                                         embedding_dim=config.chatbot_encoder_embedding_dim,
                                         padding_idx=config.input_ws.PAD
                                         )
            # 2层双向,每层hidden_size 128
            self.gru = nn.GRU(input_size=config.chatbot_encoder_embedding_dim,
                              hidden_size=config.chatbot_encoder_hidden_size,
                              num_layers=config.chatbot_encoder_number_layer,
                              batch_first=True,
                              bidirectional=config.chatbot_encoder_bidirectional,
                              dropout=config.chatbot_encoder_dropout)
    
    
        def forward(self, input,input_len):
            input_embeded = self.embedding(input)
    
            #对输入进行打包
            input_packed = pack_padded_sequence(input_embeded,input_len,batch_first=True)
            #经过GRU处理
            output,hidden = self.gru(input_packed)
            # print("encoder gru hidden:",hidden.size())
            #进行解包
            output_paded,seq_len = pad_packed_sequence(output,batch_first=True,padding_value=config.input_ws.PAD)
            #获取最上层的正向和反向最后一个时间步的输出,表示整个句子
            encoder_hidden = torch.cat([hidden[-2],hidden[-1]],dim=-1).unsqueeze(0) #[1,batch_size,128*2]
            return output_paded,encoder_hidden  #[1,batch_size,128*2]
    

      decoder.py

    """
    实现解码器
    """
    import torch.nn as nn
    import config
    import torch
    import torch.nn.functional as F
    import numpy as np
    import random
    
    
    class Decoder(nn.Module):
        def __init__(self):
            super(Decoder,self).__init__()
    
            self.embedding = nn.Embedding(num_embeddings=len(config.target_ws),
                                          embedding_dim=config.chatbot_decoder_embedding_dim,
                                          padding_idx=config.target_ws.PAD)
    
            #需要的hidden_state形状:[1,batch_size,64]
            self.gru = nn.GRU(input_size=config.chatbot_decoder_embedding_dim,
                              hidden_size=config.chatbot_decoder_hidden_size,
                              num_layers=config.chatbot_decoder_number_layer,
                              bidirectional=False,
                              batch_first=True,
                              dropout=config.chatbot_decoder_dropout)
    
            #假如encoder的hidden_size=64,num_layer=1 encoder_hidden :[2,batch_sizee,64]
    
            self.fc = nn.Linear(config.chatbot_decoder_hidden_size,len(config.target_ws))
    
        def forward(self, encoder_hidden,target):
            # print("target size:",target.size())
            #第一个时间步的输入的hidden_state
            decoder_hidden = encoder_hidden  #[1,batch_size,128*2]
            #第一个时间步的输入的input
            batch_size = encoder_hidden.size(1)
            decoder_input = torch.LongTensor([[config.target_ws.SOS]]*batch_size).to(config.device)         #[batch_size,1]
            # print("decoder_input:",decoder_input.size())
    
    
            #使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
            decoder_outputs = torch.zeros([batch_size,config.chatbot_target_max_len,len(config.target_ws)]).to(config.device)
    
            if random.random()>0.5:    #teacher_forcing机制
    
                for t in range(config.chatbot_target_max_len):
                    decoder_output_t,decoder_hidden = self.forward_step(decoder_input,decoder_hidden)
                    decoder_outputs[:,t,:] = decoder_output_t
    
    
                    #获取当前时间步的预测值
                    value,index = decoder_output_t.max(dim=-1)
                    decoder_input = index.unsqueeze(-1)  #[batch_size,1]
                    # print("decoder_input:",decoder_input.size())
            else:
                for t in range(config.chatbot_target_max_len):
                    decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
                    decoder_outputs[:, t, :] = decoder_output_t
                    #把真实值作为下一步的输入
                    decoder_input = target[:,t].unsqueeze(-1)
                    # print("decoder_input size:",decoder_input.size())
            return decoder_outputs,decoder_hidden
    
    
        def forward_step(self,decoder_input,decoder_hidden):
            '''
            计算一个时间步的结果
            :param decoder_input: [batch_size,1]
            :param decoder_hidden: [1,batch_size,128*2]
            :return:
            '''
    
            decoder_input_embeded = self.embedding(decoder_input)
            # print("decoder_input_embeded:",decoder_input_embeded.size())
    
            #out:[batch_size,1,128*2]
            #decoder_hidden :[1,bathc_size,128*2]
            out,decoder_hidden = self.gru(decoder_input_embeded,decoder_hidden)
            # print("decoder_hidden size:",decoder_hidden.size())
            #out :【batch_size,1,hidden_size】
    
            out_squeezed = out.squeeze(dim=1) #去掉为1的维度
            out_fc = F.log_softmax(self.fc(out_squeezed),dim=-1) #[bathc_size,vocab_size]
            # out_fc.unsqueeze_(dim=1) #[bathc_size,1,vocab_size]
            # print("out_fc:",out_fc.size())
            return out_fc,decoder_hidden
    
        def evaluate(self,encoder_hidden):
    
            # 第一个时间步的输入的hidden_state
            decoder_hidden = encoder_hidden  # [1,batch_size,128*2]
            # 第一个时间步的输入的input
            batch_size = encoder_hidden.size(1)
            decoder_input = torch.LongTensor([[config.target_ws.SOS]] * batch_size).to(config.device)  # [batch_size,1]
            # print("decoder_input:",decoder_input.size())
    
            # 使用全为0的数组保存数据,[batch_size,max_len,vocab_size]
            decoder_outputs = torch.zeros([batch_size, config.chatbot_target_max_len, len(config.target_ws)]).to(
                config.device)
    
            predict_result = []
            for t in range(config.chatbot_target_max_len):
                decoder_output_t, decoder_hidden = self.forward_step(decoder_input, decoder_hidden)
                decoder_outputs[:, t, :] = decoder_output_t
    
                # 获取当前时间步的预测值
                value, index = decoder_output_t.max(dim=-1)
                predict_result.append(index.cpu().detach().numpy()) #[[batch],[batch]...]
                decoder_input = index.unsqueeze(-1)  # [batch_size,1]
                # print("decoder_input:",decoder_input.size())
                # predict_result.append(decoder_input)
            #把结果转化为ndarray,每一行是一条预测结果
            predict_result = np.array(predict_result).transpose()
            return decoder_outputs, predict_result
    

      seq2seq.py

    """
    完成seq2seq模型
    """
    import torch.nn as nn
    from chatbot.encoder import Encoder
    from chatbot.decoder import Decoder
    
    
    class Seq2Seq(nn.Module):
        def __init__(self):
            super(Seq2Seq,self).__init__()
            self.encoder = Encoder()
            self.decoder = Decoder()
    
        def forward(self, input,input_len,target):
            encoder_outputs,encoder_hidden = self.encoder(input,input_len)
            decoder_outputs,decoder_hidden = self.decoder(encoder_hidden,target)
            return decoder_outputs
    
        def evaluate(self,input,input_len):
            encoder_outputs, encoder_hidden = self.encoder(input, input_len)
            decoder_outputs, predict_result = self.decoder.evaluate(encoder_hidden)
            return decoder_outputs,predict_result
    

      train.py

    """
    进行模型的训练
    """
    import torch
    import torch.nn.functional as F
    from chatbot.seq2seq import Seq2Seq
    from torch.optim import Adam
    from chatbot.dataset import get_dataloader
    from tqdm import tqdm
    import config
    import numpy as np
    import pickle
    from matplotlib import pyplot as plt
    # from eval import eval
    
    model = Seq2Seq().to(config.device)
    
    optimizer = Adam(model.parameters())
    
    loss_list = []
    
    def train(epoch):
        data_loader = get_dataloader(train=True)
        bar = tqdm(data_loader,total=len(data_loader))
    
        for idx,(input,target,input_len,target_len) in enumerate(bar):
            input = input.to(config.device)
            target = target.to(config.device)
            input_len = input_len.to(config.device)
            optimizer.zero_grad()
            decoder_outputs = model(input,input_len,target) #[batch_Size,max_len,vocab_size]
            loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
            loss.backward()
            optimizer.step()
            loss_list.append(loss.item())
            bar.set_description("epoch:{} idx:{} loss:{:.6f}".format(epoch,idx,np.mean(loss_list)))
    
            if idx%100 == 0:
                torch.save(model.state_dict(),"../chatbot/models/model.pkl")
                torch.save(optimizer.state_dict(),"../chatbot/models/optimizer.pkl")
                pickle.dump(loss_list,open("../chatbot/models/loss_list.pkl","wb"))
    
    
    if __name__ == '__main__':
        for i in range(5):
            train(i)
            # eval()
    
        # plt.figure(figsize=(50,8))
        # plt.plot(range(len(loss_list)),loss_list)
        # plt.show()
    

      eval.py

    """
    进行模型的评估
    """
    
    import torch
    import torch.nn.functional as F
    from chatbot.dataset import get_dataloader
    from tqdm import tqdm
    import config
    import numpy as np
    import pickle
    from chatbot.seq2seq import Seq2Seq
    
    def eval():
        model = Seq2Seq().to(config.device)
        model.eval()
        model.load_state_dict(torch.load("./models/model.pkl"))
    
        loss_list = []
        data_loader = get_dataloader(train=False)
        bar = tqdm(data_loader,total=len(data_loader),desc="当前进行评估")
        with torch.no_grad():
            for idx,(input,target,input_len,target_len) in enumerate(bar):
                input = input.to(config.device)
                target = target.to(config.device)
                input_len = input_len.to(config.device)
    
                decoder_outputs,predict_result = model.evaluate(input,input_len) #[batch_Size,max_len,vocab_size]
                loss = F.nll_loss(decoder_outputs.view(-1,len(config.target_ws)),target.view(-1),ignore_index=config.input_ws.PAD)
                loss_list.append(loss.item())
                bar.set_description("idx:{} loss:{:.6f}".format(idx,np.mean(loss_list)))
        print("当前的平均损失为:",np.mean(loss_list))
    
    
    def interface():
        from chatbot.cut_sentence import cut
        import config
        #加载模型
        model = Seq2Seq().to(config.device)
        model.eval()
        model.load_state_dict(torch.load("./models/model.pkl"))
    
        #准备待预测的数据
        while True:
            origin_input =input("me>>:")
            # if "你是谁" in origin_input or "你叫什么" in origin_input:
            #     result = "我是小智。"
            # elif "你好" in origin_input or "hello" in origin_input:
            #     result = "Hello"
            # else:
            _input = cut(origin_input, by_word=True)
            input_len = torch.LongTensor([len(_input)]).to(config.device)
            _input = torch.LongTensor([config.input_ws.transform(_input,max_len=config.chatbot_input_max_len)]).to(config.device)
    
            outputs,predict = model.evaluate(_input,input_len)
            result = config.target_ws.inverse_transform(predict[0])
            print("chatbot>>:",result)
    
    
    
    
    
    if __name__ == '__main__':
        interface()
    

      

    多思考也是一种努力,做出正确的分析和选择,因为我们的时间和精力都有限,所以把时间花在更有价值的地方。
  • 相关阅读:
    剑指offer编程题66道题 26-35
    剑指offer编程题66道题 1-25
    springboot的自动配置
    用智能的编译器来防错
    实现迭代器的捷径
    结束C#2的讲解:最后的一些特性
    进入快速委托通道
    可空类型
    用泛型实现参数化类型
    C#1所搭建的核心基础
  • 原文地址:https://www.cnblogs.com/LiuXinyu12378/p/12377703.html
Copyright © 2011-2022 走看看