一.利用textcnn进行文本分类,用于在问答中的意图识别。
二.结构图
import os import torch from torchtext import data,datasets from torchtext.data import Iterator, BucketIterator from torchtext.vocab import Vectors from torch import nn,optim import torch.nn.functional as F import pandas as pd import pickle DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu') intent_classification_path = os.path.abspath(os.path.join(os.getcwd(), '../..')) # 训练数据路径 train_data = os.path.join(intent_classification_path,'classification_data/classification_data.csv') # 读取数据 train_data = pd.read_csv(train_data) # 按字分 tokenize =lambda x: x.split(' ') TEXT = data.Field( sequential=True, tokenize=tokenize, lower=True, use_vocab=True, pad_token='<pad>', unk_token='<unk>', batch_first=True, fix_length=20) LABEL = data.Field( sequential=False, use_vocab=False) # 获取训练或测试数据集 def get_dataset(csv_data, text_field, label_field, test=False): fields = [('id', None), ('text', text_field), ('label', label_field)] examples = [] if test: #测试集,不加载label for text in csv_data['text']: examples.append(data.Example.fromlist([None, text, None], fields)) else: # 训练集 for text, label in zip(csv_data['text'], csv_data['label']): examples.append(data.Example.fromlist([None, text, label], fields)) return examples, fields train_examples,train_fields = get_dataset(train_data, TEXT, LABEL) train = data.Dataset(train_examples, train_fields) # 预训练数据 pretrained_embedding = os.path.join(os.getcwd(), 'sgns.sogou.char') vectors = Vectors(name=pretrained_embedding) # 构建词典 TEXT.build_vocab(train, min_freq=1, vectors = vectors) words_path = os.path.join(os.getcwd(), 'words.pkl') with open(words_path, 'wb') as f_words: pickle.dump(TEXT.vocab, f_words) BATCH_SIZE = 163 # 构建迭代器 train_iter = BucketIterator( dataset=train, batch_size=BATCH_SIZE, shuffle=True, sort_within_batch=False) # 构建分类模型 class TextCNN(nn.Module): def __init__(self, vocab_size, embedding_dim, output_size, filter_num=100, filter_size=(3,4,5), dropout=0.5): ''' vocab_size:词典大小 embedding_dim:词维度大小 output_size:输出类别数 filter_num:卷积核数量 filter_size(3,4,5):三种卷积核,size为3,4,5,每个卷积核有filter_num个,卷积核的宽度都是embedding_dim ''' super(TextCNN, self).__init__() self.embedding = nn.Embedding(vocab_size, embedding_dim) # conv2d(in_channel,out_channel,kernel_size,stride,padding),stride默认为1,padding默认为0 self.convs = nn.ModuleList([nn.Conv2d(1, filter_num,(k, embedding_dim)) for k in filter_size]) self.dropout = nn.Dropout(dropout) self.fc = nn.Linear(filter_num * len(filter_size), output_size) ''' 以下forward中的卷积和池化计算方式如下: 1.卷积 卷积后的shape公式计算简化为:np.floor((n + 2p - f)/s + 1) 输入shape:(batch, in_channel, hin, win) = (163, 1, 20, 300),20为句子长度,300为embedding大小 输出shape: hout=(20 + 2 * 0 - 1 * (3 - 1) - 1)/1 + 1 = 18 wout=(300 + 2 * 0 - 1 * (300 - 1) -1)/1 + 1 = 1 => output:(batch, out_channel, hout, wout) = (163, 100, 18, 1) 2.max_pool1d池化 简化公式:np.floor((l + 2p - f)/s + 1) 输入shape:(N,C,L):(163, 100, 18, 1) -> squeeze(3) -> (163, 100, 18) 输出shape: lout = (18 + 2*0 - 18)/18 +1 = 1 -> (163, 100, 1) ''' def forward(self, x): # x :(batch, seq_len) = (163, 20) x = self.embedding(x) # [batch,word_num,embedding_dim] = [N,H,W] -> (163, 20, 300) x = x.unsqueeze(1) # [batch, channel, word_num, embedding_dim] = [N,C,H,W] -> (163, 1, 20, 300) x = [F.relu(conv(x)).squeeze(3) for conv in self.convs] # len(filter_size) * (N, filter_num, H) -> 3 * (163, 100, 18) # MaxPool1d(kernel_size, stride=None, padding=0, dilation=1, return_indices=False, ceil_mode=False),stride默认为kernal_size x = [F.max_pool1d(output,output.shape[2]).squeeze(2) for output in x] # len(filter_size) * (N, filter_num) -> 3 * (163, 100) x = torch.cat(x, 1) # (N, filter_num * len(filter_size)) -> (163, 100 * 3) x = self.dropout(x) x = self.fc(x) return x
from torch.utils.tensorboard import SummaryWriter writer = SummaryWriter(os.getcwd()+'/log', comment='textcnn') # 训练 # 构建model model = TextCNN(len(TEXT.vocab),TEXT.vocab.vectors.shape[1],16).to(DEVICE) # 利用预训练模型初始化embedding,requires_grad=True,可以fine-tune model.embedding.weight.data.copy_(TEXT.vocab.vectors) # 训练模式 model.train() # 优化和损失 #optimizer = torch.optim.Adam(model.parameters(),lr=0.1, weight_decay=0.01) optimizer = torch.optim.SGD(model.parameters(),lr=0.1, momentum=0.9, nesterov=True) criterion = nn.CrossEntropyLoss() for iter in range(300): for i, batch in enumerate(train_iter): train_text = batch.text train_label = batch.label train_text = train_text.to(DEVICE) train_label = train_label.to(DEVICE) out = model(train_text) loss = criterion(out, train_label) optimizer.zero_grad() loss.backward() optimizer.step() if (iter+1) % 10 == 0: print ('iter [{}/{}], Loss: {:.4f}'.format(iter+1, 300, loss.item())) writer.add_scalar('loss',loss.item(),global_step=iter+1) writer.flush() writer.close() model_path = os.path.join(os.getcwd(), "model.h5") torch.save(model.state_dict(), model_path)