zoukankan      html  css  js  c++  java
  • MTCNN 实现人脸识别

    MTCNN(Multi-task CNN)

    MTCNN难点

    • WIDER FACE等数据集为我们提供的图片并不是MTCNN支持的训练样本, 需要通过几个脚本将其转为MTCNN可以接受的数据集, 这些脚本在github中可以找到; 为了增加数据读取的效率, 将数据转为tfrecord格式

    制作PNet需要的pos, neg, part数据

    • 数据来源: WIDER FACE和Cascade

      • Bounding Box的数据由WIDER FACE提供, 因为WIDER FACE没有关键点的数据
      • Landmark的数据由Cascade提供, Cascade的数据仅仅是一张人脸, 它给出人脸的Bounding Box和关键点坐标
    • 数据生成的步骤

      • 变量说明

        • w: ground truth的宽
        • h: ground truth的高
        • 图像的宽
        • height: 图像的高
        • nx: bounding box的x
        • ny: bounding box的y
      • neg数据的生成

        • 对每一个WIDER FACE中提供的样本, 我们随机生成大小为(size=np.random(12, min(width, height) / 2))的框, 12是因为在PNet阶段需要接受12x12的图像, 将生成框从原始的图像中截取出来, 我们截取出来的图像至少要大于12, 而框的x和y点确定要保证这个框不会到图像的外边去, 这样到时候resize就是缩放而已; 计算该框与这张图片中所有的ground truth的IoU, 如果(IoUle0.3)就将其该截图归为neg, 在别人写的脚本中, 对于每一张WIDER FACE图像生成50个neg样本(就是一个while循环, 一直判断neg的个数是否小于50, 在while中我们执行上面的操作, 遇到(IoUgt0.3)就continue, 这里我们仅仅关注neg, 不理会其他类型的样本), 但是这样子就固定了每一张图片neg的个数了, 这样不太好, 所以在接下来遍历图像中所有的gt时, 根据单个gt计算偏移量, 但是size还是之前公式那样子随机获得, 计算那5个左右, 这五个不像之前在while循环中那样, 如果(IoUle0.3)就添加到neg样本中, 如果不是就过; 这里要注意的是, neg中的IoU是与该图中所有的比较, 最大的小于0.3; 保存neg数据, 包括img, label=0
      • pos和part数据的生成

        • pos和part数据的生成与上面neg数据生成都是在通过for语句中, 所以pos和part数据在neg数据生成之后开始; 一张图片有多个gt, 我们通过for遍历gt, 根据WIDER FACE提供的ground truth, 我们也要向neg数据的生成类似, 生成一个框, 不过这个框生成的方式不一样, 它的大小为(size=npr.random(min(w, h) * 0.8, max(w, h) * 1.25)), 这样就会生成一些比ground truth大一些和小一些的框; 我们的目的是让ground truth与框有重叠部分, 比较我们的框与这个gt的IoU, 如果(IoUgt0.65)则为pos, 如果(IoUgt0.4)则为part, 否则不理会, 我们前面已经处理完毕了neg, 不会再处理neg了; 注意, 保存pos和part的数据, 包括img, label=1, bboxoffset; 其中offset进行了归一化, (nx1-x1)/size, (ny1-y1)/size
      • 生成landmark

        • 使用Cascade数据集, 该数据集提供了关键点, 也要进行归一化处理, 关键点的归一化依赖于gt的box, (landX-gtX)/w, (landY-gtY)/H
      • 混合neg, pos, part, landmark

        • PNet的输入是同时包含了neg, pos, part, landmark
        • 在混合的时候注意保证neg:pos:part:landmark为3:1:1:1, neg的数据量肯尼个很大, 我们可以定一个计数base, 如果len(neg)>base3, 就去neg中的base3个不然就太过了, 那个其他就是base个了, 对每一个neg, pos, part, landmark, 应该使用npr.choice进行洗牌, 这样pos所有的输入写入, neg, part, landmark所有的都写入, 生成.txt的标签用来读取

    训练代码

    • model.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    from lib import util
    
    
    class Net(object):
    
    
        def __init__(self, is_training, learning_rate, num_epochs, im_size):
            self.num_epochs = num_epochs
            self.learning_rate = learning_rate
            self.im_size = im_size
            self.X = None
            self.Y_cls = None
            self.Y_box = None
            self.Y_landmark = None
            self.cls = None
            self.box = None
            self.landmark = None
            self.build()
            if is_training:
                self.loss = self.loss_layer()
                self.global_step = tf.Variable(1, name='global_step', trainable=False)
                self.optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(self.loss, global_step=self.global_step)
    
    
        def loss_layer(self):
            cls_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.Y_cls, logits=self.cls))  * self.weights['cls'] 
            box_loss = tf.reduce_mean(tf.square(self.box - self.Y_box)) * self.weights['box']
            landmark_loss = tf.reduce_mean(tf.square(self.landmark - self.Y_landmark)) * self.weights['landmark']
            return cls_loss + box_loss + landmark_loss
    
    
        def build(self):
            pass
    
    
    class PNet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=300):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
            super(PNet, self).__init__(is_training, learning_rate, num_epochs, 12)
       
    
        def build(self):
            with tf.variable_scope('pnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/pnet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='valid'):
                    net = slim.conv2d(self.X, 10, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 16, 3, stride=1, scope='conv_2')
                    net = slim.conv2d(net, 32, 3, stride=1, scope='conv_3')
                    cls = slim.conv2d(net, 2, 1, stride=1, scope='cls_fc', activation_fn=None)
                    cls = tf.reshape(cls, [-1, 2])
                    box = slim.conv2d(net, 4, 1, stride=1, scope='box_fc', activation_fn=None)
                    box = tf.reshape(box, [-1, 4])
                    landmark = slim.conv2d(net, 10, 1, stride=1, scope='landmark_fc', activation_fn=None)
                    landmark = tf.reshape(landmark, [-1, 10])
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
    
            util.add_var_to_summary()
    
    
    class RNet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=200):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
            super(RNet, self).__init__(is_training, learning_rate, num_epochs, 24)
       
    
        def build(self):
            with tf.variable_scope('rnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/rnet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='VALID'):
                    net = slim.conv2d(self.X, 28, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 48, 3, stride=1, scope='conv_2')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
                    net = slim.conv2d(net, 64, 2, stride=1, scope='conv_3')
                    net = slim.flatten(net)
                    net = slim.fully_connected(net, 128, scope='fc_1')
                    cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc1')
                    box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc1')
                    landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc1')
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
    
            util.add_var_to_summary()
    
    
    class ONet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=100):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 1.0}
            super(ONet, self).__init__(is_training, learning_rate, num_epochs, 48)
       
    
        def build(self):
            with tf.variable_scope('rnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/onet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 64)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='VALID'):
                    net = slim.conv2d(self.X, 32, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 64, 3, stride=1, scope='conv_2')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
                    net = slim.conv2d(net, 64, 3, stride=1, scope='conv_3')
                    net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_3')
                    net = slim.conv2d(net, 128, [2, 2], stride=1, scope='conv_4')
    
                    net = slim.flatten(net)
                    net = slim.fully_connected(net, 256, scope='fc_1')
                    cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc')
                    box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc')
                    landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc')
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
            util.add_var_to_summary()
    
    
    def main(argv=None):
        pnet = PNet(True)
    
    
    if __name__ == '__main__':
        main()
    
    • util.py(工具类)
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    
    
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    
    
    def prelu(X):
        alpha = tf.get_variable('alpha', shape=X.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25)) 
        pos = tf.nn.relu(X)
        neg = alpha * (X - abs(X)) * 0.5
        return pos + neg
    
    
    def read_tfrecord(fname_queue, im_size, batch_size=128):
        reader = tf.TFRecordReader()
        _, serialized_example = reader.read(fname_queue)
        features = tf.parse_single_example(serialized_example, features={
            'cls_raw':tf.FixedLenFeature([], tf.string),
            'bbox_raw':tf.FixedLenFeature([], tf.string),
            'landmark_raw':tf.FixedLenFeature([], tf.string),
            'im_raw':tf.FixedLenFeature([], tf.string)
        })
        im = (tf.cast(tf.decode_raw(features['im_raw'], tf.uint8), tf.float32) - 127.5) / 128.0
        im.set_shape([im_size * im_size * 3])
        im = tf.reshape(im, [im_size, im_size, 3])
    
        cls = tf.decode_raw(features['cls_raw'], tf.float32)
        cls.set_shape([2])
    
        bbox = tf.decode_raw(features['bbox_raw'], tf.float32)
        bbox.set_shape([4])
    
        landmark = tf.decode_raw(features['landmark_raw'], tf.float32)
        # 从tf.decode_raw返回的矩阵是不知道shape的, 我们需要指定, 使用.set_shape指定
        landmark.set_shape([10])
    
        clses, bboxes, landmarks, ims = tf.train.shuffle_batch([cls, bbox, landmark, im], 
                batch_size=batch_size, capacity=1000 + 3 * batch_size, min_after_dequeue=1000)
        return ims, bboxes, clses, landmarks
        
    
    def add_var_to_summary():
        for var in slim.get_model_variables():
            tf.summary.histogram(var.op.name, var)
    
    
    def nms(boxes, threshold):
        x1 = boxes[:, 0]
        y1 = boxes[:, 1]
        x2 = boxes[:, 2]
        y2 = boxes[:, 3]
        score = boxes[:, 4]
        score_sorted = np.argsort(score)
        
        indice = []
        areas = (x2 - x1 + 1) * (y2 - y1 + 1)
        while score_sorted.size > 0:
            i = score_sorted[-1]
            indice.append(i)
            xx1 = np.maximum(x1[i], x1[score_sorted[0:-1]])
            yy1 = np.maximum(y1[i], y1[score_sorted[0:-1]])
            xx2 = np.minimum(x2[i], x2[score_sorted[0:-1]])
            yy2 = np.minimum(y2[i], y2[score_sorted[0:-1]])
            w = np.maximum(0.0, (xx2 - xx1 + 1))
            h = np.maximum(0.0, (yy2 - yy1 + 1))
            inner = w * h
            IoU = inner / (areas[i] + areas[score_sorted[0:-1]] - inner)
            score_sorted = score_sorted[np.where(IoU <= threshold)]
        return np.asarray(indice)
    
    
    def main(argv=None):
        pass
    
    
    if __name__ == '__main__':
        main()
    
    • train.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    import time
    sys.path.append((os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    from tensorflow.python.framework import graph_util
    from core import model
    from core.model import PNet, RNet, ONet
    from lib import util
    
    
    config = tf.ConfigProto()
    config.allow_soft_placement = True
    config.gpu_options.allow_growth = True
    
    
    tmp_dict = {'pnet': ['pnet/cls_fc/BiasAdd', 'pnet/box_fc/BiasAdd', 'pnet/landmark_fc/BiasAdd'],
            'rnet': ['rnet/cls_fc1/BiasAdd', 'rnet/box_fc1/BiasAdd', 'rnet/landmark_fc1/BiasAdd'],
            'onet': ['rnet/cls_fc/BiasAdd', 'rnet/box_fc/BiasAdd', 'rnet/landmark_fc/BiasAdd']}
    
    
    def train(net, weight_dir):
        saver = tf.train.Saver()
        with tf.Session(config=config) as sess:
            tf.summary.scalar('Loss', net.loss)
            merged = tf.summary.merge_all()
            writer = tf.summary.FileWriter('../logger', sess.graph)
    
            init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
    
            sess.run(init_op)
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
            ckpt = tf.train.get_checkpoint_state(weight_dir)
            if ckpt and ckpt.model_checkpoint_path:
                saver.restore(sess, ckpt.model_checkpoint_path)
                time.sleep(1)
                output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
                with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
                    fd.write(output_graph_def.SerializeToString())
                print('Restore from ' + os.path.join(weight_dir, 'model.ckpt'))
    
            try:
                while not coord.should_stop():
                    _, loss, global_step, result = sess.run([net.optimizer, net.loss, net.global_step, merged])
                    if global_step % 100 == 0:
                        writer.add_summary(result, global_step)
                        print('Step %s loss: %s' % (global_step, loss))
                    if global_step % 1000 == 0:
                        saver.save(sess, os.path.join(weight_dir, 'model.ckpt'), global_step=global_step)
                        print('Save model')
                    if global_step % 2000 == 0:
                        output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
                        with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
                            fd.write(output_graph_def.SerializeToString())
    
    
            except tf.errors.OutOfRangeError as e:
                print('Epochs %s, step %s' % (net.num_epochs, global_step))
            finally:
                coord.request_stop()
            coord.join(threads)
    
    
    def main(argv=None):
        train(PNet(is_training=True, learning_rate=0.0001, num_epochs=300), '../weights/pnet')
        # train(RNet(is_training=True, learning_rate=0.0001, num_epochs=200), '../weights/rnet')
        # train(ONet(is_training=True, learning_rate=0.0001, num_epochs=100), '../weights/onet')
        # print('Not train')
    
    
    if __name__ == '__main__':
        main()
    

    构建数据集代码

    • builddata.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import cv2
    import numpy as np
    import tensorflow as tf
    from lib import processbar
    from lib.processbar import ProcessBar
    
    
    data_dir = '../dataset'
    save_dir = '../tfrecords'
    
    
    def bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
    
    
    def write_data(fname, lines, im_size):
        writer = tf.python_io.TFRecordWriter(fname)
        bar = ProcessBar('Reading ' + fname.split('/')[-1], len(lines))
        examples = []
        for line in lines:
            parts = line.split()
            im_path = os.path.join(data_dir, parts[0].replace('\', '/'))
            im = cv2.imread(im_path)
            height, width, _ = im.shape
            im = im.astype('uint8')
            # 数据集提供的bbox坐标为(x1,x2,y1,y2), 我们要存储为(x1,y1,x2,y2)    
            bbox = np.array([float(parts[1]), float(parts[3]), float(parts[2]), float(parts[4])], dtype='float32')
            bbox_norm = np.array([bbox[0] / width, bbox[1] / height, bbox[2] / width, bbox[3] / height], dtype='float32')
            landmark = np.array([float(parts[5]), float(parts[6]),
                                float(parts[7]), float(parts[8]),
                                float(parts[9]), float(parts[10]),
                                float(parts[11]), float(parts[12]),
                                float(parts[13]), float(parts[14])], dtype='float32')
    
            landmark_norm = np.array([landmark[0] / width, landmark[1] / height,
                                landmark[2] / width, landmark[3] / height,
                                landmark[4] / width, landmark[5] / height,
                                landmark[6] / width, landmark[7] / height,
                                landmark[8] / width, landmark[9] / height], dtype='float32')
            # 获取Positive样本
            im_crop = cv2.resize(im[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2]), :], (im_size, im_size))
            bbox_pos = np.array([0.0, 0.0, 1.0, 1.0], dtype='float32')
            landmark_offset_pos = landmark_norm.copy()
            landmark_offset_pos[[0, 2, 4, 6, 8]] 
                                    = (landmark_offset_pos[[0, 2, 4, 6, 8]] - bbox_norm[0]) / (bbox_norm[2] - bbox_norm[0])
            landmark_offset_pos[[1, 3, 5, 7, 9]] 
                                    = (landmark_offset_pos[[1, 3, 5, 7, 9]] - bbox_norm[1]) / (bbox_norm[3] - bbox_norm[1])
            example_pos = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_pos.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_pos.tostring()),
                'im_raw':bytes_feature(im_crop.tostring()),
            }))
            examples.append(example_pos)
            
            # 取Negative样本, 分别为图像的左上角与右下角
            # 左上角
            border = [0.0, 0.0, bbox[0], bbox[1]]
            im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
            bbox_neg = bbox_norm.copy()
            bbox_neg[0] = 0.0
            bbox_neg[1] = 0.0
            bbox_neg[2] = 1.0
            bbox_neg[3] = 1.0
            landmark_offset_neg = landmark_norm.copy()
            landmark_offset_neg[0] = 0.0
            landmark_offset_neg[1] = 0.0
            landmark_offset_neg[2] = 0.0
            landmark_offset_neg[3] = 0.0
            landmark_offset_neg[4] = 0.0
            landmark_offset_neg[5] = 0.0
            landmark_offset_neg[6] = 0.0
            landmark_offset_neg[7] = 0.0
            
            example_neg = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
                'im_raw':bytes_feature(im_crop.tostring())
            }))
            examples.append(example_neg)
            # 右下角
            border = [bbox[2], bbox[3], width, height]
            im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
            example_neg = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
                'im_raw':bytes_feature(im_crop.tostring())
            }))
            examples.append(example_neg)
            bar.process()
    
        bar.done()
        bar = ProcessBar('Writing ' + fname.split('/')[-1], len(examples))
        for e in examples:
            writer.write(e.SerializeToString())
            bar.process()
        bar.done()
        writer.close()
    """
    
    def write_data(tfFileName, trainImagesDataDesc, im_size):
        writer = tf.python_io.TFRecordWriter(tfFileName)    
        examples = []
        bar = ProcessBar(tfFileName, len(trainImagesDataDesc))
        for line in trainImagesDataDesc:
            bar.process()
            descs = line.split()
            filePath = descs[0]
            filePath = filePath.replace('\', '/')
            image_file_path = os.path.join(data_dir, filePath)
            im = cv2.imread(image_file_path)
            height, width, _ = im.shape
            im = im.astype('uint8')
            # bb = [float(descs[3]) / h,float(descs[1]) / w, float(descs[4]) / h, float(descs[2]) / w]
            bb = np.array([float(descs[1]),float(descs[3]), float(descs[2]), float(descs[4])], dtype='float32')
            bb_norm = np.array([bb[0] / width, bb[1] / height, bb[2] / width, bb[3] / height], dtype='float32')
            landmark = np.array([float(descs[5]),
                        float(descs[6]),
                        float(descs[7]),
                        float(descs[8]),
                        float(descs[9]),
                        float(descs[10]),
                        float(descs[11]),
                        float(descs[12]),
                        float(descs[13]),
                        float(descs[14])], dtype='float32')
     
            landmark_norm = np.array([float(descs[5])/ width,
                        float(descs[6])/ height,
                        float(descs[7])/ width,
                        float(descs[8])/ height,
                        float(descs[9])/ width,
                        float(descs[10])/ height,
                        float(descs[11])/ width,
                        float(descs[12])/ height,
                        float(descs[13])/ width,
                        float(descs[14])/ height], dtype='float32')
    
            bb_pos = bb_norm.copy()
            bb_pos[0] = 0.0
            bb_pos[1] = 0.0
            bb_pos[2] = 1.0
            bb_pos[3] = 1.0    
    
            im_100 = im[int(bb[1]):int(bb[3]), int(bb[0]):int(bb[2]),:]
            im_100 = cv2.resize(im_100,(im_size,im_size))
            landmark_pos = landmark_norm.copy()
            landmark_pos[[0,2,4,6,8]] = (landmark_pos[[0,2,4,6,8]] - bb_norm[0])/ (bb_norm[2] -bb_norm[0])
            landmark_pos[[1,3,5,7,9]] = (landmark_pos[[1,3,5,7,9]] - bb_norm[1]) / (bb_norm[3] -bb_norm[1])
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()), # 是不是人脸
                'bb_raw':bytes_feature(bb_pos.tostring()), # box
                'landmark_raw':bytes_feature(landmark_pos.tostring()), # 关键点
                'image_raw':bytes_feature(im_100.tostring()) # 图像数据
            }))
            examples.append(example)
            # 再做2个负样本
            # 分别为左上角与右下角, 刚好没有包含人脸, 所以landmark都为0, bbox也为0
            classification = np.array([1,0], dtype='float32')
            cls_raw = classification.tostring()
    
            bb_neg = bb_norm.copy()
            landmark_neg = landmark_norm.copy()
    
            bb_neg[0] = 0.0
            bb_neg[1] = 0.0
            bb_neg[2] = 0.0
            bb_neg[3] = 0.0
            landmark_neg[0] = 0.0
            landmark_neg[1] = 0.0
            landmark_neg[2] = 0.0
            landmark_neg[3] = 0.0
            landmark_neg[4] = 0.0
            landmark_neg[5] = 0.0
            landmark_neg[6] = 0.0
            landmark_neg[7] = 0.0
            # 左上角
            x1_ = 0
            y1_ = 0
            x2_ = int(bb[0])
            y2_ = int(bb[1])
    
            im_crop = im[y1_:y2_, x1_:x2_, :]
            im_ = cv2.resize(im_crop,(im_size,im_size))
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bb_raw':bytes_feature(bb_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_neg.tostring()),
                'image_raw':bytes_feature(im_.tostring())
            }))
            examples.append(example)
            # 右下角
            x1_ = int(bb[2])
            y1_ = int(bb[3])
            x2_ = width
            y2_ = height
            im_crop = im[y1_:y2_, x1_:x2_]
            im_ = cv2.resize(im_crop,(im_size,im_size))
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bb_raw':bytes_feature(bb_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_neg.tostring()),
                'image_raw':bytes_feature(im_.tostring())
            }))
            examples.append(example)
        bar.done() 
        bar = ProcessBar(tfFileName, len(examples))
        for example in examples:
            writer.write(example.SerializeToString())
            bar.process()
        bar.done()
        writer.close()   
    """
    def main(argv=None):
        with open(os.path.join(data_dir, 'trainImageList.txt'), 'r') as fd:
            lines = fd.readlines()
        write_data('../tfrecords/pnet_data.tfrecords', lines, 12)
        write_data('../tfrecords/rnet_data.tfrecords', lines, 24)
        write_data('../tfrecords/onet_data.tfrecords', lines, 48)
        print('Finish writing data')
        
    
    if __name__ == '__main__':
        main()
    

    检测代码

    • mtcnn.py
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import cv2
    import numpy as np
    import tensorflow as tf
    from lib import util
    
    
    def bbreg(boundingbox, reg):
        bb = boundingbox.copy()    
        w = boundingbox[:, 2] - boundingbox[:, 0] + 1
        h = boundingbox[:, 3] - boundingbox[:, 1] + 1
    
        bb[:,0] = boundingbox[:, 0] + reg[:, 0] * w
        bb[:,1] = boundingbox[:, 1] + reg[:, 1] * h
        bb[:,2] = boundingbox[:, 0] + reg[:, 2] * w
        bb[:,3] = boundingbox[:, 1] + reg[:, 3] * h
        return bb
    
    
    def gen_box(imap, reg, scale, t):
        stride = 2
        cellsize = 12
        imap = np.transpose(imap)
        dx1 = np.transpose(reg[:, :, 0])
        dy1 = np.transpose(reg[:, :, 1])
        dx2 = np.transpose(reg[:, :, 2])
        dy2 = np.transpose(reg[:, :, 3])
        y, x = np.where(imap >= t)
        if y.shape[0] == 1:
            dx1 = np.flipud(dx1)
            dy1 = np.flipud(dy1)
            dx2 = np.flipud(dx2)
            dy2 = np.flipud(dy2)
        score = imap[(y, x)]
        reg = np.transpose(np.vstack([dx1[(y, x)], dy1[(y, x)],
                                      dx2[(y, x)], dy2[(y, x)]]))
        if reg.size == 0:
            reg = np.empty((0, 3))
        bb = np.transpose(np.vstack([y, x]))
        q1 = np.fix((stride * bb + 1) / scale)
        q2 = np.fix((stride * bb + cellsize) / scale)
        boundingbox = np.hstack([q1, q2, np.expand_dims(score, 1), reg])
        return boundingbox, reg
    
    
    def detect():
        im = cv2.imread('./test.jpg')
        im = im.astype('uint8')
        w,h,ch = im.shape
    
        with tf.session() as sess:
            output_graph_def = tf.graphdef()
            with open('../weights/pnet/model.pb', 'rb') as fd:
                output_graph_def.parsefromstring(fd.read())
                tf.import_graph_def(output_graph_def, name='')
                pyramid = []
                ww,hh,ch = im.shape
                minlenght = min(ww,hh)
                impyramid = im.copy()
                minsize = 20
                m = 12 / 20
                m = 1
                scales = []
                factor_counter = 0
                factor = 0.709
                ml = minlenght
                while ml >= 12 :
                    scalefactor = m * np.power(factor, factor_counter)
                    scales.append(scalefactor)
                    ww = w * scalefactor
                    hh = h * scalefactor
                    impyramid = cv2.resize(im,(int(hh),int(ww)))
                    pyramid.append((impyramid.copy() - 127.5) / 128.0)
                    factor_counter += 1
                    scalefactor = m * np.power(factor, factor_counter)
                    ml = minlenght * scalefactor
                boxes_all = np.empty((0,9))
                print("------------pnet-----------------")
                for j in range(len(scales)):
                    
                    scale = scales[j]
                    py = pyramid[j]
                    feed_value = {'pnet/input/shuffle_batch:3':[py]}
                    _cls, _bb,_landmark = sess.run(['pnet/cls_fc/biasadd:0', 'pnet/box_fc/biasadd:0','pnet/landmark_fc/biasadd:0'], feed_value)
                    softmax_ = np.exp(_cls[0,:,:,1]) / (np.exp(_cls[0,:,:,0]) + np.exp(_cls[0,:,:,1]))
                    # 通过gen_box返回boxes在原始输入图像中的映射位置
                    boxes, _ = gen_box(softmax_,#_cls[0, :, :, 1],
                                           _bb[0, :, :, :],
                                           scale,
                                           0.8)
                    if(len(boxes) > 0):
                        pick = util.nms(boxes.copy(), 0.5)
                        boxes_all = np.append( boxes_all ,boxes[pick,:], axis = 0)
    
                print("boxes picked after pnet: " + str(boxes_all.shape))
                
                numbox = boxes_all.shape[0]
                if numbox > 0:
                    pick = util.nms(boxes_all.copy(), 0.7)
                    boxes_all = boxes_all[pick, :]
                    regw = boxes_all[:, 2] - boxes_all[:, 0]
                    regh = boxes_all[:, 3] - boxes_all[:, 1]
                    # 通过bb的回归值对bb对精调
                    qq1 = boxes_all[:, 0] + boxes_all[:, 5] * regw
                    qq2 = boxes_all[:, 1] + boxes_all[:, 6] * regh
                    qq3 = boxes_all[:, 0] + boxes_all[:, 7] * regw
                    qq4 = boxes_all[:, 1] + boxes_all[:, 8] * regh
                    boxes_all = np.transpose(np.vstack([qq1, qq2, qq3, qq4,
                                                        boxes_all[:, 4]]))
                    im_bbr = im.copy()
                    r = 0
                    for box in boxes_all:
                        r += 1
                        cv2.rectangle(im_bbr,
                                    (int(box[0]), int(box[1])),
                                    (int(box[2]), int(box[3])), 
                                    (0,0,255),1)
    
                    print("------------saving p image-----------------")
                    cv2.imwrite( "./test_bbr_p.jpg",im_bbr)      
        
        
        tf.reset_default_graph()
        
       
        with tf.graph().as_default() as graph:   
            with open('../weights/rnet/model.pb', 'rb') as fd:
                output_graph_def = tf.graphdef()
                output_graph_def.parsefromstring(fd.read())
                tf.import_graph_def(output_graph_def, name='') 
        
                with tf.session() as sess:
                    batch_size = 128
                    rnet_images = np.zeros([batch_size, 24, 24, 3])
                    v = 0
                    for box in boxes_all:
                        if v >= batch_size:
                            break
                        x1 = int(box[0])                    
                        y1 = int(box[1])
                        x2 = int(box[2])
                        y2 = int(box[3])
                        rnet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (24, 24))
                        v += 1
                    rnet_images = (rnet_images - 127.5) / 128.0
                    print("------------rnet-----------------")
                    feed_value = {'rnet/input/shuffle_batch:3':rnet_images}
                    cls, bb, landmark = sess.run(['rnet/cls_fc1/biasadd:0', 'rnet/box_fc1/biasadd:0','rnet/landmark_fc1/biasadd:0'], feed_value)
                    prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
                    idx = np.where(prob > 0.8)
                    total_boxes = boxes_all[idx,:][0]
                    ipass_box = np.zeros([len(idx[0]),5])
                    for j in range(len(idx[0])):
                        ip = idx[0][j]
                        ipass_box[j][0:4] = bb[ip][0:4]
                        ipass_box[j][4] = prob[ip]
                    
    
                    pick = util.nms(total_boxes.copy(), 0.7)
                    #输入的源bb坐标
    
                    #用bb的回归量,将输入的bb再次调整
                    total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
    
                    im_bbr = im.copy()
                    
                    r = 0
                    for box in total_boxes:
                        # print("box")
                        # print(box)
                        im_bbr = cv2.rectangle(im_bbr,
                                    (int(box[0]),int(box[1])),
                                    (int(box[2]),int(box[3])), 
                                    (0,255,0),1)
    
                            
    
                        r+=1  
                    print("------------saving rnet image-----------------")
                    cv2.imwrite( "./test_bbr_r" +".jpg",im_bbr)        
    
    
        tf.reset_default_graph()
        with tf.graph().as_default() as graph:   
            with open('../weights/onet/model.pb', "rb") as f:
                output_graph_def = tf.graphdef()
                output_graph_def.parsefromstring(f.read())
                _ = tf.import_graph_def(output_graph_def, name="") 
        
                with tf.session() as sess:
                    batch = 64
                    onet_images = np.zeros((batch,48,48,3))
                    
                    v = 0
                    for box in total_boxes:
                        if(v >= batch):
                            break
                        x1 =  int(box[0])                    
                        y1 =  int(box[1])
                        x2 = int(box[2])
                        y2 = int(box[3])
                        onet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (48, 48))
                        v += 1  
                    onet_images = (onet_images - 127.5) / 128.0
                    feed_value = {'rnet/input/shuffle_batch:3':onet_images}
                    print("------------onet-----------------")
                    cls, bb, landmark = sess.run(['rnet/cls_fc/biasadd:0', 'rnet/box_fc/biasadd:0','rnet/landmark_fc/biasadd:0'], feed_value)  
                    prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
                    idx = np.where(prob > 0.95)
                    total_boxes = total_boxes[idx,:][0]
                    bb = bb[idx,:][0]
                    landmark = landmark[idx,:][0]
    
                    pick = util.nms(total_boxes.copy(), 0.7)
                    total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
    
                    v = 0
                    im_bbr_o = im.copy()
                    for box in total_boxes:
                        im_bbr_o = cv2.rectangle(im_bbr_o,
                                        (int(box[0]),int(box[1])),
                                        (int(box[2]),int(box[3])), 
                                        (0,0,255),1)
                        land = landmark[v]
                        land[[0,2,4,6,8]] = box[0] + land[[0,2,4,6,8]] * (box[2] - box[0])
                        land[[1,3,5,7,9]] = box[1] + land[[1,3,5,7,9]] * (box[3] - box[1])
                        # print(land)
                        for i in range(5):
                            im_bbr_o = cv2.rectangle(im_bbr_o,
                                            (int(land[0 + i * 2]),int(land[1 + i * 2])),
                                            (int(land[0 + i * 2] + 2),int(land[1 + i * 2] + 2)), 
                                            (0,0,0),1)
    
    
                        v += 1
                    cv2.imwrite( "./test_bbr_o"+".jpg",im_bbr_o)   
    
    
    if __name__ == '__main__':
        detect()
    
  • 相关阅读:
    对Android开发者有益的40条优化建议
    git推送tag到远端服务器
    详细注释!二维码条码扫描源码,使用Zxing core2.3
    探秘腾讯Android手机游戏平台之不安装游戏APK直接启动法
    android的logcat详细用法!
    【Android】开源项目汇总-备用
    android 中theme.xml与style.xml的区别
    Android实现对HOME键的捕获和屏蔽
    java中newInstance()和new()
    java Stack
  • 原文地址:https://www.cnblogs.com/megachen/p/10748291.html
Copyright © 2011-2022 走看看