zoukankan      html  css  js  c++  java
  • transformer-encoder用于问答中的意图识别

    一.利用transformer-encoder进行文本分类,用于在问答中的意图识别。

    二.结构图

    三.程序(完整程序:https://github.com/jiangnanboy/intent_classification/tree/master/transformer_encoder

    import os
    import torch
    from torchtext import data,datasets
    from torchtext.data import Iterator, BucketIterator
    from torchtext.vocab import Vectors
    from torch import nn,optim
    import torch.nn.functional as F
    import pandas as pd
    import pickle
    
    DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    intent_classification_path = os.path.abspath(os.path.join(os.getcwd(), '../..'))
    # 训练数据路径
    train_data = os.path.join(intent_classification_path,'classification_data/knowledge_point_qa_data.csv')
    # 读取数据
    train_data = pd.read_csv(train_data)
    # 按字分    
    tokenize =lambda x: x.split(' ')
    
    TEXT = data.Field(
                        sequential=True,
                        tokenize=tokenize,
                        lower=True,
                        use_vocab=True,
                        pad_token='<pad>',
                        unk_token='<unk>',
                        batch_first=True,
                        fix_length=20)
    
    LABEL = data.Field(
                        sequential=False,
                        use_vocab=False)
    
    # 获取训练或测试数据集
    def get_dataset(csv_data, text_field, label_field, test=False):
        fields = [('id', None), ('text', text_field), ('label', label_field)]
        examples = []
        if test: #测试集,不加载label
            for text in csv_data['text']:
                examples.append(data.Example.fromlist([None, text, None], fields))
        else: # 训练集
            for text, label in zip(csv_data['text'], csv_data['label']):
                examples.append(data.Example.fromlist([None, text, label], fields))
        return examples, fields
    
    train_examples,train_fields = get_dataset(train_data, TEXT, LABEL)
    
    train = data.Dataset(train_examples, train_fields)
    # 预训练数据
    #pretrained_embedding = os.path.join(os.getcwd(), 'sgns.sogou.char')
    #vectors = Vectors(name=pretrained_embedding)
    # 构建词典
    #TEXT.build_vocab(train, min_freq=1, vectors = vectors)
    
    TEXT.build_vocab(train, min_freq=1)
    words_path = os.path.join(os.getcwd(), 'words.pkl')
    with open(words_path, 'wb') as f_words:
        pickle.dump(TEXT.vocab, f_words)
        
    BATCH_SIZE = 163
    # 构建迭代器
    train_iter = BucketIterator(
                                dataset=train,
                                batch_size=BATCH_SIZE,
                                shuffle=True,
                                sort_within_batch=False)
    
    
    '''
        1.输入是序列中token的embedding与位置embedding
        2.token的embedding与其位置embedding相加,得到一个vector(这个向量融合了token与position信息)
        3.在2之前,token的embedding乘上一个scale(防止点积变大,造成梯度过小)向量[sqrt(emb_dim)],这个假设为了减少embedding中的变化,没有这个scale,很难稳定的去训练model。
        4.加入dropout
        5.通过N个encoder layer,得到Z。此输出Z被传入一个全连接层作分类。
        src_mask对于非<pad>值为1,<pad>为0。为了计算attention而遮挡<pad>这个无意义的token。与source 句子shape一致。
    '''
    class TransformerEncoder(nn.Module):
        def __init__(self, input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_idx):
            super(TransformerEncoder, self).__init__()
            
            self.pad_idx = pad_idx
            self.scale = torch.sqrt(torch.FloatTensor([emb_dim])).to(DEVICE)
    
            # 词的embedding
            self.token_embedding = nn.Embedding(input_dim, emb_dim)
            # 对词的位置进行embedding
            self.position_embedding = nn.Embedding(position_length, emb_dim)
            # encoder层,有几个encoder层,每个encoder有几个head
            self.layers = nn.ModuleList([EncoderLayer(emb_dim, n_heads, pf_dim, dropout) for _ in range(n_layers)])
            self.dropout = nn.Dropout(dropout)
            self.fc = nn.Linear(emb_dim, output_dim)
    
        def mask_src_mask(self, src):
            # src=[batch_size, src_len]
    
            # src_mask=[batch_size, 1, 1, src_len]
            src_mask = (src != self.pad_idx).unsqueeze(1).unsqueeze(2)
            return src_mask
        
        def forward(self, src):
            # src=[batch_size, seq_len]
            # src_mask=[batch_size, 1, 1, seq_len]
            src_mask = self.mask_src_mask(src)
            
            batch_size = src.shape[0]
            src_len = src.shape[1]
    
            # 构建位置tensor -> [batch_size, seq_len],位置序号从(0)开始到(src_len-1)
            position = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(DEVICE)
    
            # 对词和其位置进行embedding -> [batch_size,seq_len,embdim]
            token_embeded = self.token_embedding(src) * self.scale
            position_embeded = self.position_embedding(position)
    
            # 对词和其位置的embedding进行按元素加和 -> [batch_size, seq_len, embdim]
            src = self.dropout(token_embeded + position_embeded)
    
            for layer in self.layers:
                src = layer(src, src_mask)
    
            # [batch_size, seq_len, emb_dim] -> [batch_size, output_dim]
            src = src.permute(0, 2, 1)
            src = torch.sum(src, dim=-1)
            src = self.fc(src)
            return src
    
    '''
    encoder layers:
        1.将src与src_mask传入多头attention层(multi-head attention)
        2.dropout
        3.使用残差连接后传入layer-norm层(输入+输出后送入norm)后得到的输出
        4.输出通过前馈网络feedforward层
        5.dropout
        6.一个残差连接后传入layer-norm层后得到的输出喂给下一层
        注意:
            layer之间不共享参数
            多头注意力层用到的是多个自注意力层self-attention
    '''
    class EncoderLayer(nn.Module):
        def __init__(self, emb_dim, n_heads, pf_dim, dropout):
            super(EncoderLayer, self).__init__()
            # 注意力层后的layernorm
            self.self_attn_layer_norm = nn.LayerNorm(emb_dim)
            # 前馈网络层后的layernorm
            self.ff_layer_norm = nn.LayerNorm(emb_dim)
            # 多头注意力层
            self.self_attention = MultiHeadAttentionLayer(emb_dim, n_heads, dropout)
            # 前馈层
            self.feedforward = FeedforwardLayer(emb_dim, pf_dim, dropout)
    
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, src, src_mask):
            #src=[batch_size, seq_len, emb_dim]
            #src_mask=[batch_size, 1, 1, seq_len]
    
            # self-attention
            # _src=[batch size, query_len, emb_dim]
            _src, _ = self.self_attention(src, src, src, src_mask)
    
            # dropout, 残差连接以及layer-norm
            # src=[batch_size, seq_len, emb_dim]
            src = self.self_attn_layer_norm(src + self.dropout(_src))
    
            # 前馈网络
            # _src=[batch_size, seq_len, emb_dim]
            _src = self.feedforward(src)
    
            # dropout, 残差连接以及layer-norm
            # src=[batch_size, seq_len, emb_dim]
            src = self.ff_layer_norm(src + self.dropout(_src))
    
            return src
    '''
    多头注意力层的计算:
        1.q,k,v的计算是通过线性层fc_q,fc_k,fc_v
        2.对query,key,value的emb_dim split成n_heads
        3.通过计算Q*K/scale计算energy
        4.利用mask遮掩不需要关注的token
        5.利用softmax与dropout
        6.5的结果与V矩阵相乘
        7.最后通过一个前馈fc_o输出结果
    注意:Q,K,V的长度一致
    '''
    class MultiHeadAttentionLayer(nn.Module):
        def __init__(self, emb_dim, n_heads, dropout):
            super(MultiHeadAttentionLayer, self).__init__()
            assert emb_dim % n_heads == 0
            self.emb_dim = emb_dim
            self.n_heads = n_heads
            self.head_dim = emb_dim//n_heads
    
            self.fc_q = nn.Linear(emb_dim, emb_dim)
            self.fc_k = nn.Linear(emb_dim, emb_dim)
            self.fc_v = nn.Linear(emb_dim, emb_dim)
    
            self.fc_o = nn.Linear(emb_dim, emb_dim)
    
            self.dropout = nn.Dropout(dropout)
    
            self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(DEVICE)
    
        def forward(self, query, key, value, mask=None):
            # query=[batch_size, query_len, emb_dim]
            # key=[batch_size, key_len, emb_dim]
            # value=[batch_size, value_len, emb_dim]
            batch_size = query.shape[0]
    
            # Q=[batch_size, query_len, emb_dim]
            # K=[batch_size, key_len, emb_dim]
            # V=[batch_size, value_len, emb_dim]
            Q = self.fc_q(query)
            K = self.fc_k(key)
            V = self.fc_v(value)
    
            '''
            view与reshape的异同:
            
            torch的view()与reshape()方法都可以用来重塑tensor的shape,区别就是使用的条件不一样。view()方法只适用于满足连续性条件的tensor,并且该操作不会开辟新的内存空间,
            只是产生了对原存储空间的一个新别称和引用,返回值是视图。而reshape()方法的返回值既可以是视图,也可以是副本,当满足连续性条件时返回view,
            否则返回副本[ 此时等价于先调用contiguous()方法在使用view() ]。因此当不确能否使用view时,可以使用reshape。如果只是想简单地重塑一个tensor的shape,
            那么就是用reshape,但是如果需要考虑内存的开销而且要确保重塑后的tensor与之前的tensor共享存储空间,那就使用view()。
            '''
    
            # Q=[batch_size, n_heads, query_len, head_dim]
            # K=[batch_size, n_heads, key_len, head_dim]
            # V=[batch_size, n_heads, value_len, head_dim]
            Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
            K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
            V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
    
            # 注意力打分矩阵 [batch_size, n_heads, query_len, head_dim] * [batch_size, n_heads, head_dim, key_len] = [batch_size, n_heads, query_len, key_len]
            energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
    
            if mask is not None:
                energy = energy.masked_fill(mask == 0, -1e10)
            # [batch_size, n_heads, query_len, key_len]
            attention = torch.softmax(energy , dim = -1)
    
            # [batch_size, n_heads, query_len, key_len]*[batch_size, n_heads, value_len, head_dim]=[batch_size, n_heads, query_len, head_dim]
            x = torch.matmul(self.dropout(attention), V)
    
            # [batch_size, query_len, n_heads, head_dim]
            x = x.permute(0, 2, 1, 3).contiguous()
    
            # [batch_size, query_len, emb_dim]
            x = x.view(batch_size, -1, self.emb_dim)
    
            # [batch_size, query_len, emb_dim]
            x = self.fc_o(x)
    
            return x, attention
    
    '''
    前馈层
    '''
    class FeedforwardLayer(nn.Module):
        def __init__(self, emb_dim, pf_dim, dropout):
            super(FeedforwardLayer, self).__init__()
            self.fc_1 = nn.Linear(emb_dim, pf_dim)
            self.fc_2 = nn.Linear(pf_dim, emb_dim)
    
            self.dropout = nn.Dropout(dropout)
    
        def forward(self, x):
            # x=[batch_size, seq_len, emb_dim]
    
            # x=[batch_size, seq_len, pf_dim]
            x = self.dropout(torch.relu(self.fc_1(x)))
    
            # x=[batch_size, seq_len, emb_dim]
            x = self.fc_2(x)
    
            return x
    from torch.utils.tensorboard import SummaryWriter
    writer = SummaryWriter(os.getcwd()+'/log', comment='transformer-encoder')
    
    # 训练
    input_dim = len(TEXT.vocab) 
    output_dim = 9
    emb_dim = 256
    n_layers = 3
    n_heads = 8
    pf_dim = 512
    dropout = 0.5
    position_length = 20
    
    # <pad>
    pad_index = TEXT.vocab.stoi[TEXT.pad_token]
    
    # 构建model
    model = TransformerEncoder(input_dim, output_dim, emb_dim, n_layers, n_heads, pf_dim, dropout, position_length, pad_index).to(DEVICE)
    # 利用预训练模型初始化embedding,requires_grad=True,可以fine-tune
    # model.embedding.weight.data.copy_(TEXT.vocab.vectors)
    # 训练模式
    model.train()
    # 优化和损失
    optimizer = torch.optim.Adam(model.parameters(),lr=0.001, weight_decay=0.01)
    # optimizer = torch.optim.SGD(model.parameters(),lr=0.001, momentum=0.9, nesterov=True)
    criterion = nn.CrossEntropyLoss()
    
    with writer:
        for iter in range(300):
            for i, batch in enumerate(train_iter):
                train_text = batch.text
                train_label = batch.label
                train_text = train_text.to(DEVICE)
                train_label = train_label.to(DEVICE)
                out = model(train_text)
                loss = criterion(out, train_label)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                if (iter+1) % 10 == 0:
                        print ('iter [{}/{}], Loss: {:.4f}'.format(iter+1, 300, loss.item()))
                #writer.add_graph(model, input_to_model=train_text,verbose=False)
                writer.add_scalar('loss',loss.item(),global_step=iter+1)
        writer.flush()
        writer.close()
                
    model_path = os.path.join(os.getcwd(), "model.h5")
    torch.save(model.state_dict(), model_path)

  • 相关阅读:
    objectMediator
    vi
    string regex
    ar
    widget class in class
    Makefile 语法分析 第三部分
    在Makefile中的 ".PHONY "是做什么的?
    】openssl移植Android使用及其相关经验分享
    精品Android源码推荐,看了绝不后悔
    Makefile 语法分析 第三部分
  • 原文地址:https://www.cnblogs.com/little-horse/p/14313100.html
Copyright © 2011-2022 走看看