zoukankan      html  css  js  c++  java
  • tensorflow的写诗代码分析【转】

    本文转载自:https://dongzhixiao.github.io/2018/07/21/so-hot/

        今天周六,早晨出门吃饭,全身汗湿透。天气真的是太热了!我决定一天不出门,在屋子里面休息!
    晚上,腾飞给我说了他暑假的计划,决定去长沙、成都去转一圈,并邀请我去,还顺便叫我晚上去吃饭。
    最后我们就一起吃了一顿饭,不过我估计我休息的时间是下下周,因此可能不能和他一起去了。
    

    今天总结一下本周学习到的知识:

    周一

    在进行神经网络序列输入的时候,发现了一个很好的文件代码用来数据预处理。

    注意:后面使用“数据单元”代表数据的一个最小单元,比如训练英文数据就可以代表一个字符——’a’,训练中文数据就可以代表一个汉字——’王’etc

    文件名字叫做read_utils.py。该文件中实现了一个类TextConverter和一个工具函数batch_generator

    该文件是一个工具类,用于把一个输入文件根据编码输出对应的一批一批的数据用于RNN/LSTM之类的文本处理神经网络训练, 用法是先使用TextConverter类编码所有的内容为数据单元对应数字,然后使用batch_generator函数将编码好的数字分批返回 比如:

    text = f.read()  #f是open后得到的文件指针
    converter = TextConverter(text)
    arr = converter.text_to_arr(text)
    g = batch_generator(arr,num_seqs,num_steps)   #如果输入本来就是编码好的数据,则直接使用这个函数即可
    

    下面让我们来一个一个学习一下。

    TextConverter类

    TextConverter类是用来将传入的文件中所有数据 首先,我们看看该类的构造函数。

    class TextConverter(object):
        def __init__(self, text=None, max_vocab=5000, filename=None):
            if filename is not None:
                with open(filename, 'rb') as f:
                    self.vocab = pickle.load(f)
            else:
                vocab = set(text)    #存储读取文件中的数据单元所有类型的集合,比如英文文件会是:{'
    ','A','b',...,'
    '}
                print(len(vocab))    #打印数据单元的种类的数目
                # max_vocab_process
                vocab_count = {}     #存储每一个数据单元在整个读入的文本中出现的次数的字典
                for word in vocab:
                    vocab_count[word] = 0
                for word in text:
                    vocab_count[word] += 1
                vocab_count_list = []    #存储元组(数据单元,对应数量)组成的列表,然后按照数量的大小排序,比如[('a',100),('d',20),...,('x',3)]
                for word in vocab_count:
                    vocab_count_list.append((word, vocab_count[word]))
                vocab_count_list.sort(key=lambda x: x[1], reverse=True)
                if len(vocab_count_list) > max_vocab:      #根据传入的最大数量的数据单元数截断前max_vocab大的数据单元,基本上不可能,除非遇到汉字之类的文本
                    vocab_count_list = vocab_count_list[:max_vocab]
                vocab = [x[0] for x in vocab_count_list]
                self.vocab = vocab     #vocab仅仅存储数据单元按照出现数量从大到小的列表,例如:['a','d',...,'x']
    
            self.word_to_int_table = {c: i for i, c in enumerate(self.vocab)}    # 数据单元到数字字典{' ':0,'e':1,...,'c':20,...}
            self.int_to_word_table = dict(enumerate(self.vocab))         # 数字到数据单元字典{0:‘ ’,1:'e',...,20:'c',..}
    

    可以看出,该构造函数的输入是(文本内容,最大词限制,文件名)。可以看出最后一个关键字参数filename是用来判断文件是否为空,从而直接读取 不用进入后面的处理环节,这个地方跟后面的保存模块对应的:

        def save_to_file(self, filename):      #仅仅存储数据单元按照出现数量从大到小的列表到指定文件filename处,例如:['a','d',...,'x']
            with open(filename, 'wb') as f:
                pickle.dump(self.vocab, f)
    

    保存后,以后就可以直接使用这个词表了。 如果没有传入文件名,则说明需要进行后续的处理,我们仔细看一下后面的代码,发现实际上做的工作就是:

    • 找到所有数据中“数据单元”
    • 遍历文件记录每个“数据单元”出现的次数,根据次数大小对“数据单元”排序
    • 根据传入参数max_vocab截断数据单元,只去前max_vocab个“数据单元”
    • 将留下的数据单元一一映射到自然数0,1,2…上

    注意传入的text是一个列表或者列表生成器之类的数据结构,因为后面的代码把它这样子用了(比如去text的集合,用for迭代text等)。

    在构造函数中已经实现了“数据单元”到自然数列的映射,因此互相转换的函数就显而易见了,如下所示:

        def word_to_int(self, word):    #返回数据单元对应的整数
            if word in self.word_to_int_table:
                return self.word_to_int_table[word]
            else:
                return len(self.vocab)    #如果出现了没有出现的词,则变为<unk>对应的标记
    
        def int_to_word(self, index):    #返回整数对应的数据单元
            if index == len(self.vocab):
                return '<unk>'          #没有出现的词被标记为unknown的缩写
            elif index < len(self.vocab):
                return self.int_to_word_table[index]
            else:
                raise Exception('Unknown index!')
    

    由上面的函数可知,在映射的时候如果词没有出现在词表中,则标记为<unk>返回,这个是非常重要的一个处理,因为在实际进行数据 输入的时候,由于截断引起的超出数据记录的词,或者在进行测试集的时候很有可能出现这种情况!

    既然有了单个“数据单元”和自然数的映射,多个“数据单元”组成的列表当然也能相互转化:

        def text_to_arr(self, text):     #将输入的text根据word_to_int返回得到对应的编码数,并构成np.ndarray并返回,例如:输入' a
    ',则返回类似array([ 0, 0, 4, 10])
            arr = []
            for word in text:
                arr.append(self.word_to_int(word))
            return np.array(arr)
    
        def arr_to_text(self, arr):    #输入列表类型的数据,返回对应的数据单元的组合
            words = []
            for index in arr:
                words.append(self.int_to_word(index))
            return "".join(words)
    

    batch_generator

    有了数据编码的类,下面就需要一个样本生成的函数了。 根据输入的数据(这个输入一般就是全部样本组成的文本,并且已经根据所有数据单元编码成为了数字列表), 返回对应的生成器,满足输入的序列个数和序列长度

    def batch_generator(arr, n_seqs, n_steps):   #根据输入的arr(这个输入一般就是全部样本组成的文本,并且已经根据所有数据单元编码成为了数字列表),返回对应的生成器,满足输入的序列个数和序列长度
        arr = copy.copy(arr)
        batch_size = n_seqs * n_steps         #计算没次输入需要使用的数据单元
        n_batches = int(len(arr) / batch_size)   #一共可以得到多少组输入数据
        arr = arr[:batch_size * n_batches]     #直接忽略了后面不能构成一组输入的数据!
        arr = arr.reshape((n_seqs, -1))
        while True:
            np.random.shuffle(arr)     #将所有行打乱顺序
            for n in range(0, arr.shape[1], n_steps):
                x = arr[:, n:n + n_steps]          #每次选择对应n_seqs行,n_steps列的数据
                y = np.zeros_like(x)    #返回跟x同形状的n维数组,数据全部都是0
                y[:, :-1], y[:, -1] = x[:, 1:], x[:, 0]
                yield x, y
    

    可以看出,该函数根据输入的所有训练数据,和对应的序列一批的个数(n_seqs)和每个输入的序列的长度(n_steps),然后 通过生成器函数不断的迭代取出来数据用于训练。每一个输入和输出的序列刚错开一位,比如:

    #如果输入的x是[[48 49 50]
    #             [ 0  1  2]]
    # 则输出的y是[[49 50 48]
    #            [ 1  2  0]]
    

    周二

    文件名字叫做model.py。该文件中实现了一个类CharRNN和一个工具函数pick_top_n

    CharRNN

    下面介绍模型类,这个模型使用的是TensorFlow模块,然后进行网络的搭建,首先看构造函数:

    class CharRNN:
        def __init__(self, num_classes, num_seqs=64, num_steps=50,
                     lstm_size=128, num_layers=2, learning_rate=0.001,
                     grad_clip=5, sampling=False, train_keep_prob=0.5, use_embedding=False, embedding_size=128):
            if sampling is True:
                num_seqs, num_steps = 1, 1
            else:
                num_seqs, num_steps = num_seqs, num_steps
    
            self.num_classes = num_classes
            self.num_seqs = num_seqs                 #序列个数
            self.num_steps = num_steps               #序列长度
            self.lstm_size = lstm_size
            self.num_layers = num_layers
            self.learning_rate = learning_rate
            self.grad_clip = grad_clip
            self.train_keep_prob = train_keep_prob
            self.use_embedding = use_embedding
            self.embedding_size = embedding_size
    
            tf.reset_default_graph()
            self.build_inputs()    #构建输入层
            self.build_lstm()      #构建LSTM层
            self.build_loss()      #构建损失函数
            self.build_optimizer() #构建优化器
            self.saver = tf.train.Saver()  #保存设置
            #下面测试,增加总结
            tf.summary.scalar('loss',self.loss)
            for var in tf.trainable_variables():
                tf.summary.histogram(var.op.name, var)
            self.merge_summary = tf.summary.merge_all()
    
            self.train_writer = tf.summary.FileWriter('./model')
            self.train_writer.add_graph(tf.get_default_graph())
    

    可以看出,该构造函数根据输入的参数,搭建了一个R输入-R输出的神经网络,隐状态用的是LSTM模型。 首先先保存各个输入的设定,然后分别构建各个层和优化保存相关的设置,我们一个一个看:

        def build_inputs(self):
            with tf.name_scope('inputs'):
                self.inputs = tf.placeholder(tf.int32, shape=(
                    self.num_seqs, self.num_steps), name='inputs')
                self.targets = tf.placeholder(tf.int32, shape=(
                    self.num_seqs, self.num_steps), name='targets')
                self.keep_prob = tf.placeholder(tf.float32, name='keep_prob')
    
                # 对于中文,需要使用embedding层
                # 英文字母没有必要用embedding层
                if self.use_embedding is False:
                    self.lstm_inputs = tf.one_hot(self.inputs, self.num_classes)
                else:
                    with tf.device("/cpu:0"):
                        embedding = tf.get_variable('embedding', [self.num_classes, self.embedding_size])
                        self.lstm_inputs = tf.nn.embedding_lookup(embedding, self.inputs)
    

    上面的函数就是输入层,可以看出,根据输入的参数embedding来确定输入层是否增加一个嵌入层,显然,如果数据的词表 比较大,比如中文,就需要嵌入层降维,如果比较小,就可以不用嵌入层。

    然后是LSTM层:

        def build_lstm(self):
            # 创建单个cell并堆叠多层
            def get_a_cell(lstm_size, keep_prob):
                lstm = tf.nn.rnn_cell.BasicLSTMCell(lstm_size)
                drop = tf.nn.rnn_cell.DropoutWrapper(lstm, output_keep_prob=keep_prob)
                return drop
    
            with tf.name_scope('lstm'):
                cell = tf.nn.rnn_cell.MultiRNNCell(
                    [get_a_cell(self.lstm_size, self.keep_prob) for _ in range(self.num_layers)]
                )
                self.initial_state = cell.zero_state(self.num_seqs, tf.float32)
    
                # 通过dynamic_rnn对cell展开时间维度
                self.lstm_outputs, self.final_state = tf.nn.dynamic_rnn(cell, self.lstm_inputs, initial_state=self.initial_state)
    
                # 通过lstm_outputs得到概率
                seq_output = tf.concat(self.lstm_outputs, 1)
                x = tf.reshape(seq_output, [-1, self.lstm_size])
    
                with tf.variable_scope('softmax'):
                    softmax_w = tf.Variable(tf.truncated_normal([self.lstm_size, self.num_classes], stddev=0.1))
                    softmax_b = tf.Variable(tf.zeros(self.num_classes))
    
                self.logits = tf.matmul(x, softmax_w) + softmax_b
                self.proba_prediction = tf.nn.softmax(self.logits, name='predictions')
    

    可以看出,LSTM层使用的是多层,层数根据参数self.num_layers确定LSTM的隐层的层数。然后得到输出使用的是softmax激活函数,可以 得到输出的每一个类别的概率。

    之后是损失和优化:

     def build_loss(self):
            with tf.name_scope('loss'):
                y_one_hot = tf.one_hot(self.targets, self.num_classes)
                y_reshaped = tf.reshape(y_one_hot, self.logits.get_shape())
                loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=y_reshaped)
                self.loss = tf.reduce_mean(loss)
                
                
        def build_optimizer(self):
            # 使用clipping gradients
            tvars = tf.trainable_variables()
            grads, _ = tf.clip_by_global_norm(tf.gradients(self.loss, tvars), self.grad_clip)
            train_op = tf.train.AdamOptimizer(self.learning_rate)
            self.optimizer = train_op.apply_gradients(zip(grads, tvars))
    

    损失使用的就是一般常用的交叉熵损失,优化则使用的是比较著名的自适应优化器adam

    之后就可以开始训练了:

        def train(self, batch_generator, max_steps, save_path, save_every_n, log_every_n):
            self.session = tf.Session()
            with self.session as sess:
                sess.run(tf.global_variables_initializer())
                # Train network
                step = 0
                new_state = sess.run(self.initial_state)
                for x, y in batch_generator:
                    step += 1
                    start = time.time()
                    feed = {self.inputs: x,
                            self.targets: y,
                            self.keep_prob: self.train_keep_prob,
                            self.initial_state: new_state}
                    batch_loss, new_state, _ , train_summary = sess.run([self.loss,
                                                         self.final_state,
                                                         self.optimizer,
                                                         self.merge_summary],
                                                        feed_dict=feed)
    
                    end = time.time()
                    # control the print lines
                    if step % log_every_n == 0:
                        print('step: {}/{}... '.format(step, max_steps),
                              'loss: {:.4f}... '.format(batch_loss),
                              '{:.4f} sec/batch'.format((end - start)))
                        self.train_writer.add_summary(train_summary, step)
                    if (step % save_every_n == 0):
                        self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
    
                    if step >= max_steps:
                        break
                self.saver.save(sess, os.path.join(save_path, 'model'), global_step=step)
    

    可以看出,训练就是根据前面搭建的网络和生成的样本,往里面不断的喂数据。然后将结果不断保存。

    训练好模型后,我们就可以读取保存好的模型:

        def load(self, checkpoint):
            self.session = tf.Session()
            self.saver.restore(self.session, checkpoint)
            print('Restored from: {}'.format(checkpoint))
    

    读取了保存好模型中的各种参数后,就看一通过这个网络生成样本:

        def sample(self, n_samples, prime, vocab_size):  #n_samples:一共输出多少个基本单元;prime:开始的几个基本单元;vocab_size:一共有多少个类型的基本单元+1(未知数据编码)
            samples = [c for c in prime]
            sess = self.session
            new_state = sess.run(self.initial_state)
            preds = np.ones((vocab_size, ))  # for prime=[]
            for c in prime:    #根据输入的“基本单元”的多少,不断更新状态,直到最后的输入为止!真好的实现!
                x = np.zeros((1, 1))
                # 输入单个字符
                x[0, 0] = c
                feed = {self.inputs: x,
                        self.keep_prob: 1.,
                        self.initial_state: new_state}         #每次输入时更新状态即可达到连续的效果,对应LSTM状态是元组(c,h)
                preds, new_state = sess.run([self.proba_prediction, self.final_state],
                                            feed_dict=feed)
    
            c = pick_top_n(preds, vocab_size)
            # 添加字符到samples中
            samples.append(c)
    
            # 不断生成字符,直到达到指定数目
            for i in range(n_samples):
                x = np.zeros((1, 1))
                x[0, 0] = c
                feed = {self.inputs: x,
                        self.keep_prob: 1.,
                        self.initial_state: new_state}
                preds, new_state = sess.run([self.proba_prediction, self.final_state],
                                            feed_dict=feed)
    
                c = pick_top_n(preds, vocab_size)
                samples.append(c)
    
            return np.array(samples)
    

    注意这个函数是根据输入的前几个自然数序列(已经通过“基本单元”映射为自然数了),预测下一个输出的对应自然数。 其中第一个for循环出色的使用了权重共享的思想,使用sample这个函数的时候使得在构造函数时sample这个参数为True。 然后一个一个的将“基本单元”映射后的自然数输入,这样每次仅更新隐状态输出的状态参数。 之后第二个for循环依次生成后续的一个一个自然数。

    pick_top_n

    在上一小节的最后一个sample函数中,用到了pick_top_n函数,这个函数的内容如下:

    def pick_top_n(preds, vocab_size, top_n=5):
        p = np.squeeze(preds)    #squeeze函数从数组的形状中删除单维度条目,即把shape中为1的维度去掉
        # 将除了top_n个预测值的位置都置为0
        p[np.argsort(p,kind = 'mergesort')[:-top_n]] = 0      #argsort函数可以按照给定方法排序
        # 归一化概率
        p = p / np.sum(p)
        # 随机选取一个字符
        c = np.random.choice(vocab_size, 1, p=p)[0]
        return c
    

    可以看出,该函数通过输入的各个序列的概率,然后根据n取得概率前几个最大的概率,之后通过这些概率进行归一化,然后得到留下来 的数字序列对应的概率分布律,最后通过np.random.choice按照各个字符的分布律来随机选择一个字符并返回。

    周三

    预测和精度

    今天,通过前两天的代码的学习,我今天将我需要用到的数据序列通过read_utils.py预处理,之后放到model.py里面进行训练。 之后设置了20000步的训练,结果发现可以成功运行并根据输入生成一系列新的输出,但是我希望能够直接得到下一个字符的概率,因此 可以按照如下的方式进行实现:

    也可以通过这个网络预测下一个出现的“数据单元”的概率:

        def prediction_next_n(self,prime,vocab_size,next_n =3 , **k):  #prime:开始的几个基本单元;vocab_size:一共有多少个类型的基本单元+1(未知数据编码)
    # samples = [c for c in prime]
            sess = self.session
            new_state = sess.run(self.initial_state)
            preds = np.ones((vocab_size,))  # for prime=[]
            for c in prime:  # 根据输入的“基本单元”的多少,不断更新状态,直到最后的输入为止!真好的实现!
                x = np.zeros((1, 1))
                # 输入单个字符
                x[0, 0] = c
                feed = {self.inputs: x,
                        self.keep_prob: 1.,
                        self.initial_state: new_state}  # 每次输入时更新状态即可达到连续的效果,对应LSTM状态是元组(c,h)
                preds, new_state = sess.run([self.proba_prediction, self.final_state],
                                            feed_dict=feed)
    
            p = np.squeeze(preds)    #squeeze函数从数组的形状中删除单维度条目,即把shape中为1的维度去掉
            # 将next_n个最大的概率的位置得到
            next_n_num = np.argsort(p,kind = 'mergesort')[-next_n:]  #argsort函数可以按照给定方法排序
            #返回的应该是标号和对应的概率值
            s_p_d = []
            for i in next_n_num:
                s_p_d.append((i,p[i]))
            return s_p_d
    

    返回的这个各个自然数的概率,就可以进行预测生成新的数据对应的结果了。

    为了后续的测试,我需要得到精度,因此实现一个计算精度的函数:

        def get_accuracy(self,dualList,vocab_size,next_n =3):   #输入的序列满足有开始的标记,没有结尾的标记
            success_num = 0    
            for one_session in dualList:
                if one_session[-1] in self.prediction_next_n(one_session[:-1],vocab_size,next_n):
                    success_num = success_num + 1
            print(success_num,len(dualList))    
            print('精度是:%.4f' % (success_num/len(dualList)) )
    

    TensorBoard的使用

    为了将所有数据都显示出来,我使用了TensorBoard进行显示。

  • 相关阅读:
    原型模式
    简单工厂模式与工厂方法模式
    监听器 Listener
    代理模式
    装饰模式
    软件设计的原则
    事务的特性和隔离级别
    JDBC事务(三)ThreadLocal绑定Connection
    JDBC事务(二)转账示例
    JDBC事务(一)
  • 原文地址:https://www.cnblogs.com/zzb-Dream-90Time/p/9788643.html
Copyright © 2011-2022 走看看