zoukankan      html  css  js  c++  java
  • NLP采用Bert进行简单文本情感分类

    参照当Bert遇上Kerashttps://spaces.ac.cn/archives/6736此示例准确率达到95.5%+
    https://github.com/CyberZHG/keras-bert/blob/master/README.zh-CN.md

    示例实现

    # ! -*- coding:utf-8 -*-
    
    import json
    import numpy as np
    import pandas as pd
    from random import choice
    from keras_bert import load_trained_model_from_checkpoint, Tokenizer
    import codecs
    
    maxlen = 100
    config_path = 'model/bert_config.json'
    checkpoint_path = 'model/bert_model.ckpt'
    dict_path = 'model/vocab.txt'
    
    token_dict = {}
    
    with codecs.open(dict_path, 'r', 'utf8') as reader:
        for line in reader:
            token = line.strip()
            token_dict[token] = len(token_dict)
    
    
    class OurTokenizer(Tokenizer):
    
        def __init__(self, token_dict):
            super(OurTokenizer, self).__init__(token_dict)
    
        def _tokenize(self, text):
            R = []
            for c in text:
                if c in self._token_dict:
                    R.append(c)
                elif self._is_space(c):
                    R.append('[unused1]')  # space类用未经训练的[unused1]表示
                else:
                    R.append('[UNK]')  # 剩余的字符是[UNK]
            return R
    
    
    tokenizer = OurTokenizer(token_dict)
    
    neg = pd.read_excel('neg.xls', header=None)
    pos = pd.read_excel('pos.xls', header=None)
    
    data = []
    
    for d in neg[0]:
        data.append((d, 0))
    
    for d in pos[0]:
        data.append((d, 1))
    
    # 按照9:1的比例划分训练集和验证集
    random_order = list(range(len(data)))
    np.random.shuffle(random_order)
    train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
    valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
    
    
    def seq_padding(X, padding=0):
        L = [len(x) for x in X]
        ML = max(L)
        return np.array([
            np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
        ])
    
    
    class data_generator:
    
        def __init__(self, data, batch_size=32):
            self.data = data
            self.batch_size = batch_size
            self.steps = len(self.data) // self.batch_size
            if len(self.data) % self.batch_size != 0:
                self.steps += 1
    
        def __len__(self):
            return self.steps
    
        def __iter__(self):
            while True:
                idxs = list(range(len(self.data)))
                np.random.shuffle(idxs)
                X1, X2, Y = [], [], []
                for i in idxs:
                    d = self.data[i]
                    text = d[0][:maxlen]
                    x1, x2 = tokenizer.encode(first=text)
                    y = d[1]
                    X1.append(x1)
                    X2.append(x2)
                    Y.append([y])
                    if len(X1) == self.batch_size or i == idxs[-1]:
                        X1 = seq_padding(X1)
                        X2 = seq_padding(X2)
                        Y = seq_padding(Y)
                        yield [X1, X2], Y
                        [X1, X2, Y] = [], [], []
    
    
    from keras.layers import *
    from keras.models import Model
    from keras.optimizers import Adam
    
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    
    for l in bert_model.layers:
        l.trainable = False
    
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    
    x = bert_model([x1_in, x2_in])
    
    x = Lambda(lambda x: x[:, 0])(x)
    p = Dense(1, activation='sigmoid')(x)
    
    model = Model([x1_in, x2_in], p)
    model.compile(
        loss='binary_crossentropy',
        optimizer=Adam(1e-5),  # 用足够小的学习率
        metrics=['accuracy']
    )
    model.summary()
    
    train_D = data_generator(train_data)
    valid_D = data_generator(valid_data)
    
    test = [train_data[0]]
    test_D = data_generator(test)
    
    model.fit_generator(
        train_D.__iter__(),
        steps_per_epoch=len(train_D),
        epochs=1,
        validation_data=valid_D.__iter__(),
        validation_steps=len(valid_D)
    )
     
    #保存模型权重值
    model.save('model.h5')
    

    原示例存在的问题

    模型在保持完之后再进行加载时提示存在自定义层和激活方法的问题,暂没找到解决办法,如有知道办法的小伙伴请留言私信

    问题解决

    # ! -*- coding:utf-8 -*-
    
    import json
    import numpy as np
    import pandas as pd
    from random import choice
    from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_custom_objects
    import re, os
    import codecs
    from keras.models import load_model
    
    maxlen = 100
    config_path = 'model/bert_config.json'
    checkpoint_path = 'model/bert_model.ckpt'
    dict_path = 'model/vocab.txt'
    
    token_dict = {}
    
    with codecs.open(dict_path, 'r', 'utf8') as reader:
        for line in reader:
            token = line.strip()
            token_dict[token] = len(token_dict)
    
    
    class OurTokenizer(Tokenizer):
    
        def __init__(self, token_dict):
            super(OurTokenizer, self).__init__(token_dict)
    
        def _tokenize(self, text):
            R = []
            for c in text:
                if c in self._token_dict:
                    R.append(c)
                elif self._is_space(c):
                    R.append('[unused1]')  # space类用未经训练的[unused1]表示
                else:
                    R.append('[UNK]')  # 剩余的字符是[UNK]
            return R
    
    
    tokenizer = OurTokenizer(token_dict)
    
    neg = pd.read_excel('neg.xls', header=None)
    pos = pd.read_excel('pos.xls', header=None)
    
    data = []
    
    for d in neg[0]:
        data.append((d, 0))
    
    for d in pos[0]:
        data.append((d, 1))
    
    # 按照9:1的比例划分训练集和验证集
    random_order = list(range(len(data)))
    np.random.shuffle(random_order)
    train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
    valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
    
    
    def seq_padding(X, padding=0):
        L = [len(x) for x in X]
        ML = max(L)
        return np.array([
            np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
        ])
    
    
    class data_generator:
    
        def __init__(self, data, batch_size=32):
            self.data = data
            self.batch_size = batch_size
            self.steps = len(self.data) // self.batch_size
            if len(self.data) % self.batch_size != 0:
                self.steps += 1
    
        def __len__(self):
            return self.steps
    
        def __iter__(self):
            while True:
                idxs = list(range(len(self.data)))
                np.random.shuffle(idxs)
                X1, X2, Y = [], [], []
                for i in idxs:
                    d = self.data[i]
                    text = d[0][:maxlen]
                    x1, x2 = tokenizer.encode(first=text)
                    y = d[1]
                    X1.append(x1)
                    X2.append(x2)
                    Y.append([y])
                    if len(X1) == self.batch_size or i == idxs[-1]:
                        X1 = seq_padding(X1)
                        X2 = seq_padding(X2)
                        Y = seq_padding(Y)
                        yield [X1, X2], Y
                        [X1, X2, Y] = [], [], []
    
    
    from keras.layers import *
    from keras.models import Model
    import keras.backend as K
    from keras.optimizers import Adam
    
    bert_model = load_trained_model_from_checkpoint(config_path, checkpoint_path, seq_len=None)
    
    for l in bert_model.layers:
        l.trainable = False
    
    x1_in = Input(shape=(None,))
    x2_in = Input(shape=(None,))
    
    x = bert_model([x1_in, x2_in])
    
    print(bert_model.layers)
    
    x = Lambda(lambda x: x[:, 0])(x)
    p = Dense(1, activation='sigmoid')(x)
    
    model = Model([x1_in, x2_in], p)
    model.compile(
        loss='binary_crossentropy',
        optimizer=Adam(1e-5),  # 用足够小的学习率
        metrics=['accuracy']
    )
    model.summary()
    train_D = data_generator(train_data)
    valid_D = data_generator(valid_data)
    
    '''
    model.fit_generator(
        train_D.__iter__(),
        steps_per_epoch=len(train_D),
        epochs=5,
        validation_data=valid_D.__iter__(),
        validation_steps=len(valid_D)
    )
    
    model.save('save_path.h5')
    '''
    
    
    # 定义生成器将数据集解析为
    class data_token_generator:
    
        def __init__(self, data, batch_size=32):
            self.data = data
            self.batch_size = batch_size
            self.steps = len(self.data)  # self.batch_size
            if len(self.data) % self.batch_size != 0:
                self.steps += 1
                
        def __len__(self):
            return self.steps
    
        def get_data(self):
            idxs = list(range(len(self.data)))
            np.random.shuffle(idxs)
            X1, X2, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                text = d[0][:maxlen]
                print(text)
                x1, x2 = tokenizer.encode(first=text)
                y = d[1]
                X1.append(x1)
                X2.append(x2)
                Y.append([y])
            X1 = seq_padding(X1)
            X2 = seq_padding(X2)
            Y = seq_padding(Y)
            return X1, X2, Y
    
    
    new_model = load_model('save_path.h5', custom_objects=get_custom_objects())
    test_T = data_token_generator(valid_data[0:10])
    X_test1, X_test2, Y_test = test_T.get_data()
    print(Y_test)
    print(new_model.predict([X_test1, X_test2]))
    

    我的实现

    # ! -*- coding:utf-8 -*-
    import numpy as np
    import pandas as pd
    from random import choice
    from keras_bert import load_trained_model_from_checkpoint, Tokenizer, get_checkpoint_paths
    import codecs
    from keras.layers import *
    from keras.models import Model
    from keras.optimizers import Adam
    
    # 评价文本最大长度
    maxlen = 100
    dict_path = 'model/vocab.txt'
    token_dict = {}
    EPOCHS = 30
    BATCH_SIZE = 128
    
    # 初始化令牌字典
    with codecs.open(dict_path, 'r', 'utf8') as reader:
        for line in reader:
            token = line.strip()
            # print(token, len(token_dict))
            token_dict[token] = len(token_dict)
    
    
    # 定义令牌解析器
    class OurTokenizer(Tokenizer):
    
        def _tokenize(self, text):
            R = []
            for c in text:
                if c in self._token_dict:
                    R.append(c)
                elif self._is_space(c):
                    R.append('[unused1]')  # space类用未经训练的[unused1]表示
                else:
                    R.append('[UNK]')  # 剩余的字符是[UNK]
            return R
    
    
    # 初始化令牌解析器
    tokenizer = OurTokenizer(token_dict)
    
    # 读取数据集
    neg = pd.read_excel('neg.xls', header=None)
    pos = pd.read_excel('pos.xls', header=None)
    
    data = []
    
    for d in neg[0]:
        data.append((d, 0))
    
    for d in pos[0]:
        data.append((d, 1))
    
    # 按照9:1的比例划分训练集和验证集
    random_order = list(range(len(data)))
    np.random.shuffle(random_order)
    train_data = [data[j] for i, j in enumerate(random_order) if i % 10 != 0]
    valid_data = [data[j] for i, j in enumerate(random_order) if i % 10 == 0]
    
    
    # 令牌序列长度补全
    def seq_padding(X, padding=0):
        L = [len(x) for x in X]
        ML = max(L)
        t = [
            np.concatenate([x, [padding] * (ML - len(x))]) if len(x) < ML else x for x in X
        ]
        return  t
    
    
    # 定义生成器将数据集解析为
    class data_token_generator:
    
        def __init__(self, data, batch_size=32, print_text=False):
            self.data = data
            self.batch_size = batch_size
            self.steps = len(self.data)  # self.batch_size
            self.print_text = print_text
            if len(self.data) % self.batch_size != 0:
                self.steps += 1
                
            # bert中文模型路径
            paths = get_checkpoint_paths('model')
            # bert中文模型加载
            self.bert_model = load_trained_model_from_checkpoint(paths.config, paths.checkpoint, seq_len=None)
    
            for l in self.bert_model.layers:
                l.trainable = True    
                
        def __len__(self):
            return self.steps
    
        def get_data(self):
            data_x = []
            data_y = []
            idxs = list(range(len(self.data)))
            # 随机
            np.random.shuffle(idxs)
            indices, segments, Y = [], [], []
            for i in idxs:
                d = self.data[i]
                # 截取数据
                text = d[0][:maxlen]
                if self.print_text:
                    print(text)
                # 生成指标及段
                indice, segment = tokenizer.encode(first=text)
                y = d[1]
                # 数据放入数组中
                indices.append(indice)
                segments.append(segment)
                Y.append([y])
                # 转化成批次
                if len(indices) == self.batch_size or i == idxs[-1]:
                    indices = seq_padding(indices)
                    segments = seq_padding(segments)
                    Y = seq_padding(Y)
                    # 产生词向量
                    x = self.bert_model.predict([np.array(indices), np.array(segments)])
                    
                    j_idxs = list(range(len(x)))
                    for j in j_idxs:
                        data_x.append(x[j])
                        data_y.append(Y[j])
                    
                    print(len(data_y))
                    [indices, segments, Y] = [], [], []
    
            return np.array(data_x), np.array(data_y)
    
        
    # 定义二分类网络
    x_in = Input(shape=(None, 768))
    x = Lambda(lambda x: x[:, 0])(x_in)
    p = Dense(1, activation='sigmoid')(x)
    
    model = Model(x_in, p)
    model.compile(
        loss='binary_crossentropy',
        optimizer=Adam(1e-5),  # 用足够小的学习率
        metrics=['accuracy']
    )
    # 打印模型结构
    model.summary()
    
    # 开始训练
    print('Training -----------')
    
    train_T = data_token_generator(train_data)
    train_x, train_y = train_T.get_data()
    valid_T = data_token_generator(valid_data)
    validation_data = valid_T.get_data()
    model.fit(
        train_x,
        train_y,
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        validation_data=validation_data
    )
     
    model.save('new_model.h5')    
    
    # 加载模型验证
    import keras
    
    test_T = data_token_generator(valid_data[0:10], print_text=True)
    X_test, Y_test = test_T.get_data()
    print(Y_test)
    new_model = keras.models.load_model('new_model.h5')
    y = new_model.predict(X_test)
    print(y)
    

    采用哈工大版权重,准确率在80%左右

    相关依赖

    中文版权重

  • 相关阅读:
    开始学习Oracle
    Oracle失败──用户错误
    2.1.1 数据库文件
    Oracle──网络失败
    再向前迈一步
    CSDN、CNBLOGS使用Windows Live Writer客户端发布日志时相关配置
    1.2 备份与恢复基础
    Oracle失败──用户进程失败
    Oracle失败类型
    MAC地址 正则表达式
  • 原文地址:https://www.cnblogs.com/gmhappy/p/11863936.html
Copyright © 2011-2022 走看看