语料链接:https://pan.baidu.com/s/1Cm88-t2wSORPea7QV4V-Ag
提取码:l3vg
语料中分为pos.txt和neg.txt,每一行是一个完整的句子,句子之间用空格分开,句子平均长度为20(提前代码计算,设定超参数)。
提前导包:
1 import numpy as np 2 import matplotlib.pyplot as plt 3 from collections import Counter 4 import tqdm 5 import random 6 import torch 7 from torch import nn, optim 8 9 10 random.seed(53113) 11 np.random.seed(53113) 12 torch.manual_seed(53113) 13 14 # 设定一些超参数 15 SENTENCE_LIMIT_SIZE = 20 # 句子平均长度 16 BATCH_SIZE = 128 # the batch size 每轮迭代1个batch的数量 17 LEARNING_RATE = 1e-3 # the initial learning rate 学习率 18 EMBEDDING_SIZE = 200 #词向量维度
1.加载数据
1 def read(filename): 2 with open(filename, encoding='mac_roman') as f: 3 text = f.read().lower() 4 return text 5 6 pos_text, neg_text = read("corpurs/pos.txt"), read("corpurs/neg.txt") 7 total_text = pos_text +' '+ neg_text #合并文本
2.构造词典和映射
构造词典的步骤一般就是对文本进行分词再进行去重。当我们对文本的单词进行统计后,会发现有很多出现频次仅为1次的单词,这类单词会增加词典容量并且会给文本处理带来一定的噪声。因此在构造词典过程中仅保留在语料中出现频次大于1的单词。其中<pad>和<unk>是两个初始化的token,<pad>用来做句子填补,<unk>用来替代语料中未出现过的单词。
1 text = total_text.split() 2 vocab = [w for w, f in Counter(text).most_common() if f>1] 3 vocab = ['<pad>', '<unk>'] + vocab 4 5 token_to_word = {i:word for i, word in enumerate(vocab)} #编码到单词 6 word_to_token = {word:i for i, word in token_to_word.items()} #单词到编码 7 8 VOCAB_SIZE = len(token_to_word) #词汇表单词数10382
3.转换文本
根据映射表将原始文本转换为机器可识别的编码。另外为了保证句子有相同的长度,需要对句子长度进行处理,这里设置20作为句子的标准长度:
- 对于超过20个单词的句子进行截断;
- 对于不足20个单词的句子进行PAD补全;
1 def convert_text_to_token(sentence, word_to_token_map=word_to_token, limit_size=SENTENCE_LIMIT_SIZE): 2 """ 3 根据单词-编码映射表将单个句子转化为token 4 5 @param sentence: 句子,str类型 6 @param word_to_token_map: 单词到编码的映射 7 @param limit_size: 句子最大长度。超过该长度的句子进行截断,不足的句子进行pad补全 8 9 return: 句子转换为token后的列表 10 """ 11 # 获取unknown单词和pad的token 12 unk_id = word_to_token_map["<unk>"] 13 pad_id = word_to_token_map["<pad>"] 14 15 # 对句子进行token转换,对于未在词典中出现过的词用unk的token填充 16 tokens = [word_to_token_map.get(word, unk_id) for word in sentence.lower().split()] 17 18 if len(tokens) < limit_size: #补齐 19 tokens.extend([0] * (limit_size - len(tokens))) 20 else: #截断 21 tokens = tokens[:limit_size] 22 23 return tokens
接下来对pos文本和neg文本进行转换:
1 pos_tokens = [convert_text_to_token(sentence) for sentence in pos_text.split(' ')] 2 neg_tokens = [convert_text_to_token(sentence) for sentence in neg_text.split(' ')] 3 4 #为了方便处理数据,转化成numpy格式 5 pos_tokens = np.array(pos_tokens) 6 neg_tokens = np.array(neg_tokens) 7 total_tokens = np.concatenate((pos_tokens, neg_tokens), axis=0) #(10662, 20) 8 9 pos_targets = np.ones((pos_tokens.shape[0])) 10 neg_targets = np.zeros((neg_tokens.shape[0])) 11 total_targets = np.concatenate((pos_targets, neg_targets), axis=0).reshape(-1, 1) #(10662, 1)
4.加载预训练的词向量
4.1法一:gensim加载Glove向量
1 def load_embedding_model(): 2 """ Load GloVe Vectors 3 Return: 4 wv_from_bin: All 400000 embeddings, each lengh 200 5 """ 6 import gensim.downloader as api 7 wv_from_bin = api.load("glove-wiki-gigaword-200") 8 print("Loaded vocab size %i" % len(wv_from_bin.vocab.keys())) 9 return wv_from_bin 10 11 model = load_embedding_model()
4.2法二:gensim加载Word2Vec向量
1 from gensim.test.utils import datapath, get_tmpfile 2 from gensim.models import KeyedVectors 3 from gensim.scripts.glove2word2vec import glove2word2vec 4 5 6 glove_file = datapath('F:/python_DemoCode/PytorchEx/.vector_cache/glove.6B.100d.txt') #输入文件 7 word2vec_glove_file = get_tmpfile("F:/python_DemoCode/PytorchEx/.vector_cache/glove.6B.100d.word2vec.txt") #输出文件 8 glove2word2vec(glove_file, word2vec_glove_file) #转换 9 10 model = KeyedVectors.load_word2vec_format(word2vec_glove_file) #加载转化后的文件
基于上面某一种预训练方法来构建自己的word embeddings
1 static_embeddings = np.zeros([VOCAB_SIZE, EMBEDDING_SIZE]) 2 for word, token in tqdm.tqdm(word_to_token.items()): 3 #用词向量填充 4 if word in model.vocab.keys(): 5 static_embeddings[token, :] = model[word] 6 elif word == '<pad>': #如果是空白,用零向量填充 7 static_embeddings[token, :] = np.zeros(EMBEDDING_SIZE) 8 else: #如果没有对应的词向量,则用随机数填充 9 static_embeddings[token, :] = 0.2 * np.random.random(EMBEDDING_SIZE) - 0.1 10 11 print(static_embeddings.shape) #(10382, 200) 即(vocab_size,embedding_dim)
4.划分数据集
1 from sklearn.model_selection import train_test_split 2 X_train,X_test,y_train,y_test=train_test_split(total_tokens, total_targets, test_size=0.2) 3 print(X_train.shape, y_train.shape) #(8529, 20) (8529, 1)
5.生成batch
1 def get_batch(x, y, batch_size=BATCH_SIZE, shuffle=True): 2 assert x.shape[0] == y.shape[0], print("error shape!") 3 4 if shuffle: 5 shuffled_index = np.random.permutation(range(x.shape[0])) 6 x = x[shuffled_index] 7 y = y[shuffled_index] 8 9 n_batches = int(x.shape[0] / batch_size) #统计共几个完整的batch 10 11 for i in range(n_batches - 1): 12 x_batch = x[i*batch_size: (i+1)*batch_size] 13 y_batch = y[i*batch_size: (i+1)*batch_size] 14 15 yield x_batch, y_batch
6.CNN模型
输入句子序列后,经过embedding,获得每个单词的词向量(此例中词向量维度为200,而图中为6),那我们就得到一个seq_len* embedding_dim的矩阵(此例中seq_len是指句子平均长度20)。在这里我们可以将它看做是channel=1,height=seq_len,width=embedding_dim的一张图片,然后就可以用filter去做卷积操作。
由于我们采用了多种filter(filter size=3, 4, 5),因此对于卷积池化操作要分filter处理。对于每个filter,先经过卷积操作得到conv(卷积操作的目的是在height方向滑动来捕捉词与词之间的局部关系,得到多个列向量),再经过relu函数激活后进行池化,得到max_pooling(max-pooling操作来提取每个列向量中的最重要的信息)。由于我们每个filter有100个,因此最终经过flatten后我们可以得到100*3=300维向量,用于连接全连接层。
Tip:关于nn.ModuleList
在构造函数__init__中用到list、tuple、dict等对象时,考虑用ModuleList,它被设计用来存储任意数量的nn. module。
1 class TextCNN(nn.Module): #output_size为输出类别(2个类别,0和1),三种kernel,size分别是3,4,5,每种kernel有100个 2 def __init__(self, vocab_size, embedding_dim, output_size, filter_num=100, kernel_lst=(3,4,5), dropout=0.5): 3 super(TextCNN, self).__init__() 4 5 self.embedding = nn.Embedding(vocab_size, embedding_dim) 6 self.convs = nn.ModuleList([ 7 nn.Sequential(nn.Conv2d(1, filter_num, (kernel, embedding_dim)), #1表示channel_num,filter_num即输出数据通道数,卷积核大小为(kernel, embedding_dim) 8 nn.ReLU(), 9 nn.MaxPool2d((SENTENCE_LIMIT_SIZE - kernel + 1, 1))) 10 for kernel in kernel_lst]) 11 self.fc = nn.Linear(filter_num * len(kernel_lst), output_size) 12 self.dropout = nn.Dropout(dropout) 13 14 def forward(self, x): 15 x = self.embedding(x) #[128, 20, 200](batch, seq_len, embedding_dim) 16 x = x.unsqueeze(1) #[128, 1, 20, 200] 即(batch, channel_num, seq_len, embedding_dim) 17 out = [conv(x) for conv in self.convs] 18 19 out = torch.cat(out, dim=1) #[128, 300, 1, 1] 20 out = out.view(x.size(0), -1) #[128, 300] 21 out = self.dropout(out) 22 logit = self.fc(out) #[128, 2] 23 return logit
这张图有助于理解整体过程:
调用CNN模型,使用预训练过的embedding来替换随机初始化:
1 cnn = TextCNN(VOCAB_SIZE, 200, 2) 2 cnn.embedding.weight.data.copy_(torch.FloatTensor(static_embeddings))
查看模型:
1 print(cnn)
TextCNN( (embedding): Embedding(10382, 200) (convs): ModuleList( (0): Sequential( (0): Conv2d(1, 100, kernel_size=(3, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(18, 1), stride=(18, 1), padding=0, dilation=1, ceil_mode=False) ) (1): Sequential( (0): Conv2d(1, 100, kernel_size=(4, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(17, 1), stride=(17, 1), padding=0, dilation=1, ceil_mode=False) ) (2): Sequential( (0): Conv2d(1, 100, kernel_size=(5, 200), stride=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=(16, 1), stride=(16, 1), padding=0, dilation=1, ceil_mode=False) ) ) (fc): Linear(in_features=300, out_features=2, bias=True) (dropout): Dropout(p=0.5, inplace=False) )
定义优化器和交叉熵损失:
1 optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE) 2 criteon = nn.CrossEntropyLoss()
7.训练函数
首先定义计算准确率的函数
1 def binary_acc(preds, y): 2 correct = torch.eq(preds, y).float() 3 acc = correct.sum() / len(correct) 4 return acc
训练函数
1 def train(cnn, optimizer, criteon): 2 3 avg_loss = [] 4 avg_acc = [] 5 cnn.train() #表示进入训练模式 6 7 8 for x_batch, y_batch in get_batch(X_train, y_train): #遍历每一个batch,x_batch.shape=(128, 20) y_batch.shape=(128, 1) 9 x_batch = torch.LongTensor(x_batch) #要先转成Tensor类型,否则计算交叉熵时会报错 10 y_batch = torch.LongTensor(y_batch) 11 12 y_batch = y_batch.squeeze() #torch.Size([128]) 13 pred = cnn(x_batch) #torch.Size([128, 2]) 14 15 #torch.max(pred, dim=1)[1]得到每一行概率值较大的索引 16 acc = binary_acc(torch.max(pred, dim=1)[1], y_batch) #计算每个batch的准确率 17 avg_acc.append(acc) 18 19 loss = criteon(pred, y_batch) 20 21 optimizer.zero_grad() 22 loss.backward() 23 optimizer.step() 24 25 avg_acc = np.array(avg_acc).mean() 26 return avg_acc
8.评估函数
和训练函数类似,但没有反向传播过程。
1 def evaluate(cnn, criteon): 2 avg_acc = [] 3 cnn.eval() #表示进入测试模式 4 5 with torch.no_grad(): 6 for x_batch, y_batch in get_batch(X_test, y_test): 7 x_batch = torch.LongTensor(x_batch) 8 y_batch = torch.LongTensor(y_batch) 9 10 y_batch = y_batch.squeeze() #torch.Size([128]) 11 pred = cnn(x_batch) #torch.Size([128, 2]) 12 13 acc = binary_acc(torch.max(pred, dim=1)[1], y_batch) 14 avg_acc.append(acc) 15 16 avg_acc = np.array(avg_acc).mean() 17 return avg_acc
迭代训练:
1 cnn_train_acc, cnn_test_acc = [], [] 2 3 for epoch in range(50): 4 5 train_acc = train(cnn, optimizer, criteon) 6 print('epoch={},训练准确率={}'.format(epoch, train_acc)) 7 test_acc = evaluate(cnn, criteon) 8 print("epoch={},测试准确率={}".format(epoch, test_acc)) 9 cnn_train_acc.append(train_acc) 10 cnn_test_acc.append(test_acc) 11 12 plt.plot(cnn_train_acc) 13 plt.plot(cnn_test_acc) 14 plt.ylim(ymin=0.5, ymax=1.01) 15 plt.title("The accuracy of CNN model") 16 plt.legend(["train", "test"])
由于没有正则化,出现了过拟合的现象。
为了解决上述问题,可以在定义优化器时加上weight_decay参数。
1 optimizer = optim.Adam(cnn.parameters(), lr=LEARNING_RATE, weight_decay = 0.01)
参考链接(这是用TensorFlow实现的):https://zhuanlan.zhihu.com/p/37978321