zoukankan      html  css  js  c++  java
  • MTCNN 实现人脸识别

    MTCNN(Multi-task CNN)

    MTCNN难点

    • WIDER FACE等数据集为我们提供的图片并不是MTCNN支持的训练样本, 需要通过几个脚本将其转为MTCNN可以接受的数据集, 这些脚本在github中可以找到; 为了增加数据读取的效率, 将数据转为tfrecord格式

    制作PNet需要的pos, neg, part数据

    • 数据来源: WIDER FACE和Cascade

      • Bounding Box的数据由WIDER FACE提供, 因为WIDER FACE没有关键点的数据
      • Landmark的数据由Cascade提供, Cascade的数据仅仅是一张人脸, 它给出人脸的Bounding Box和关键点坐标
    • 数据生成的步骤

      • 变量说明

        • w: ground truth的宽
        • h: ground truth的高
        • 图像的宽
        • height: 图像的高
        • nx: bounding box的x
        • ny: bounding box的y
      • neg数据的生成

        • 对每一个WIDER FACE中提供的样本, 我们随机生成大小为(size=np.random(12, min(width, height) / 2))的框, 12是因为在PNet阶段需要接受12x12的图像, 将生成框从原始的图像中截取出来, 我们截取出来的图像至少要大于12, 而框的x和y点确定要保证这个框不会到图像的外边去, 这样到时候resize就是缩放而已; 计算该框与这张图片中所有的ground truth的IoU, 如果(IoUle0.3)就将其该截图归为neg, 在别人写的脚本中, 对于每一张WIDER FACE图像生成50个neg样本(就是一个while循环, 一直判断neg的个数是否小于50, 在while中我们执行上面的操作, 遇到(IoUgt0.3)就continue, 这里我们仅仅关注neg, 不理会其他类型的样本), 但是这样子就固定了每一张图片neg的个数了, 这样不太好, 所以在接下来遍历图像中所有的gt时, 根据单个gt计算偏移量, 但是size还是之前公式那样子随机获得, 计算那5个左右, 这五个不像之前在while循环中那样, 如果(IoUle0.3)就添加到neg样本中, 如果不是就过; 这里要注意的是, neg中的IoU是与该图中所有的比较, 最大的小于0.3; 保存neg数据, 包括img, label=0
      • pos和part数据的生成

        • pos和part数据的生成与上面neg数据生成都是在通过for语句中, 所以pos和part数据在neg数据生成之后开始; 一张图片有多个gt, 我们通过for遍历gt, 根据WIDER FACE提供的ground truth, 我们也要向neg数据的生成类似, 生成一个框, 不过这个框生成的方式不一样, 它的大小为(size=npr.random(min(w, h) * 0.8, max(w, h) * 1.25)), 这样就会生成一些比ground truth大一些和小一些的框; 我们的目的是让ground truth与框有重叠部分, 比较我们的框与这个gt的IoU, 如果(IoUgt0.65)则为pos, 如果(IoUgt0.4)则为part, 否则不理会, 我们前面已经处理完毕了neg, 不会再处理neg了; 注意, 保存pos和part的数据, 包括img, label=1, bboxoffset; 其中offset进行了归一化, (nx1-x1)/size, (ny1-y1)/size
      • 生成landmark

        • 使用Cascade数据集, 该数据集提供了关键点, 也要进行归一化处理, 关键点的归一化依赖于gt的box, (landX-gtX)/w, (landY-gtY)/H
      • 混合neg, pos, part, landmark

        • PNet的输入是同时包含了neg, pos, part, landmark
        • 在混合的时候注意保证neg:pos:part:landmark为3:1:1:1, neg的数据量肯尼个很大, 我们可以定一个计数base, 如果len(neg)>base3, 就去neg中的base3个不然就太过了, 那个其他就是base个了, 对每一个neg, pos, part, landmark, 应该使用npr.choice进行洗牌, 这样pos所有的输入写入, neg, part, landmark所有的都写入, 生成.txt的标签用来读取

    训练代码

    • model.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    from lib import util
    
    
    class Net(object):
    
    
        def __init__(self, is_training, learning_rate, num_epochs, im_size):
            self.num_epochs = num_epochs
            self.learning_rate = learning_rate
            self.im_size = im_size
            self.X = None
            self.Y_cls = None
            self.Y_box = None
            self.Y_landmark = None
            self.cls = None
            self.box = None
            self.landmark = None
            self.build()
            if is_training:
                self.loss = self.loss_layer()
                self.global_step = tf.Variable(1, name='global_step', trainable=False)
                self.optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(self.loss, global_step=self.global_step)
    
    
        def loss_layer(self):
            cls_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.Y_cls, logits=self.cls))  * self.weights['cls'] 
            box_loss = tf.reduce_mean(tf.square(self.box - self.Y_box)) * self.weights['box']
            landmark_loss = tf.reduce_mean(tf.square(self.landmark - self.Y_landmark)) * self.weights['landmark']
            return cls_loss + box_loss + landmark_loss
    
    
        def build(self):
            pass
    
    
    class PNet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=300):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
            super(PNet, self).__init__(is_training, learning_rate, num_epochs, 12)
       
    
        def build(self):
            with tf.variable_scope('pnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/pnet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='valid'):
                    net = slim.conv2d(self.X, 10, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 16, 3, stride=1, scope='conv_2')
                    net = slim.conv2d(net, 32, 3, stride=1, scope='conv_3')
                    cls = slim.conv2d(net, 2, 1, stride=1, scope='cls_fc', activation_fn=None)
                    cls = tf.reshape(cls, [-1, 2])
                    box = slim.conv2d(net, 4, 1, stride=1, scope='box_fc', activation_fn=None)
                    box = tf.reshape(box, [-1, 4])
                    landmark = slim.conv2d(net, 10, 1, stride=1, scope='landmark_fc', activation_fn=None)
                    landmark = tf.reshape(landmark, [-1, 10])
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
    
            util.add_var_to_summary()
    
    
    class RNet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=200):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
            super(RNet, self).__init__(is_training, learning_rate, num_epochs, 24)
       
    
        def build(self):
            with tf.variable_scope('rnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/rnet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='VALID'):
                    net = slim.conv2d(self.X, 28, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 48, 3, stride=1, scope='conv_2')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
                    net = slim.conv2d(net, 64, 2, stride=1, scope='conv_3')
                    net = slim.flatten(net)
                    net = slim.fully_connected(net, 128, scope='fc_1')
                    cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc1')
                    box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc1')
                    landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc1')
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
    
            util.add_var_to_summary()
    
    
    class ONet(Net):
    
    
        def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=100):
            self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 1.0}
            super(ONet, self).__init__(is_training, learning_rate, num_epochs, 48)
       
    
        def build(self):
            with tf.variable_scope('rnet', reuse=False):
                with tf.name_scope('input'):
                    fname_queue = tf.train.string_input_producer(['../tfrecords/onet_data.tfrecords'], num_epochs=self.num_epochs)
                    self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 64)
                
                with slim.arg_scope([slim.conv2d], activation_fn=util.prelu, 
                        weights_initializer=slim.xavier_initializer(),
                        biases_initializer=tf.zeros_initializer(),
                        weights_regularizer=slim.l2_regularizer(0.0005),
                        padding='VALID'):
                    net = slim.conv2d(self.X, 32, 3, stride=1, scope='conv_1')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
                    net = slim.conv2d(net, 64, 3, stride=1, scope='conv_2')
                    net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
                    net = slim.conv2d(net, 64, 3, stride=1, scope='conv_3')
                    net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_3')
                    net = slim.conv2d(net, 128, [2, 2], stride=1, scope='conv_4')
    
                    net = slim.flatten(net)
                    net = slim.fully_connected(net, 256, scope='fc_1')
                    cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc')
                    box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc')
                    landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc')
                    self.cls = cls
                    self.box = box
                    self.landmark = landmark
            util.add_var_to_summary()
    
    
    def main(argv=None):
        pnet = PNet(True)
    
    
    if __name__ == '__main__':
        main()
    
    • util.py(工具类)
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    
    
    os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
    
    
    def prelu(X):
        alpha = tf.get_variable('alpha', shape=X.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25)) 
        pos = tf.nn.relu(X)
        neg = alpha * (X - abs(X)) * 0.5
        return pos + neg
    
    
    def read_tfrecord(fname_queue, im_size, batch_size=128):
        reader = tf.TFRecordReader()
        _, serialized_example = reader.read(fname_queue)
        features = tf.parse_single_example(serialized_example, features={
            'cls_raw':tf.FixedLenFeature([], tf.string),
            'bbox_raw':tf.FixedLenFeature([], tf.string),
            'landmark_raw':tf.FixedLenFeature([], tf.string),
            'im_raw':tf.FixedLenFeature([], tf.string)
        })
        im = (tf.cast(tf.decode_raw(features['im_raw'], tf.uint8), tf.float32) - 127.5) / 128.0
        im.set_shape([im_size * im_size * 3])
        im = tf.reshape(im, [im_size, im_size, 3])
    
        cls = tf.decode_raw(features['cls_raw'], tf.float32)
        cls.set_shape([2])
    
        bbox = tf.decode_raw(features['bbox_raw'], tf.float32)
        bbox.set_shape([4])
    
        landmark = tf.decode_raw(features['landmark_raw'], tf.float32)
        # 从tf.decode_raw返回的矩阵是不知道shape的, 我们需要指定, 使用.set_shape指定
        landmark.set_shape([10])
    
        clses, bboxes, landmarks, ims = tf.train.shuffle_batch([cls, bbox, landmark, im], 
                batch_size=batch_size, capacity=1000 + 3 * batch_size, min_after_dequeue=1000)
        return ims, bboxes, clses, landmarks
        
    
    def add_var_to_summary():
        for var in slim.get_model_variables():
            tf.summary.histogram(var.op.name, var)
    
    
    def nms(boxes, threshold):
        x1 = boxes[:, 0]
        y1 = boxes[:, 1]
        x2 = boxes[:, 2]
        y2 = boxes[:, 3]
        score = boxes[:, 4]
        score_sorted = np.argsort(score)
        
        indice = []
        areas = (x2 - x1 + 1) * (y2 - y1 + 1)
        while score_sorted.size > 0:
            i = score_sorted[-1]
            indice.append(i)
            xx1 = np.maximum(x1[i], x1[score_sorted[0:-1]])
            yy1 = np.maximum(y1[i], y1[score_sorted[0:-1]])
            xx2 = np.minimum(x2[i], x2[score_sorted[0:-1]])
            yy2 = np.minimum(y2[i], y2[score_sorted[0:-1]])
            w = np.maximum(0.0, (xx2 - xx1 + 1))
            h = np.maximum(0.0, (yy2 - yy1 + 1))
            inner = w * h
            IoU = inner / (areas[i] + areas[score_sorted[0:-1]] - inner)
            score_sorted = score_sorted[np.where(IoU <= threshold)]
        return np.asarray(indice)
    
    
    def main(argv=None):
        pass
    
    
    if __name__ == '__main__':
        main()
    
    • train.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    import time
    sys.path.append((os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
    import numpy as np
    import tensorflow as tf
    import tensorflow.contrib.slim as slim
    from tensorflow.python.framework import graph_util
    from core import model
    from core.model import PNet, RNet, ONet
    from lib import util
    
    
    config = tf.ConfigProto()
    config.allow_soft_placement = True
    config.gpu_options.allow_growth = True
    
    
    tmp_dict = {'pnet': ['pnet/cls_fc/BiasAdd', 'pnet/box_fc/BiasAdd', 'pnet/landmark_fc/BiasAdd'],
            'rnet': ['rnet/cls_fc1/BiasAdd', 'rnet/box_fc1/BiasAdd', 'rnet/landmark_fc1/BiasAdd'],
            'onet': ['rnet/cls_fc/BiasAdd', 'rnet/box_fc/BiasAdd', 'rnet/landmark_fc/BiasAdd']}
    
    
    def train(net, weight_dir):
        saver = tf.train.Saver()
        with tf.Session(config=config) as sess:
            tf.summary.scalar('Loss', net.loss)
            merged = tf.summary.merge_all()
            writer = tf.summary.FileWriter('../logger', sess.graph)
    
            init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
    
            sess.run(init_op)
            coord = tf.train.Coordinator()
            threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
            ckpt = tf.train.get_checkpoint_state(weight_dir)
            if ckpt and ckpt.model_checkpoint_path:
                saver.restore(sess, ckpt.model_checkpoint_path)
                time.sleep(1)
                output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
                with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
                    fd.write(output_graph_def.SerializeToString())
                print('Restore from ' + os.path.join(weight_dir, 'model.ckpt'))
    
            try:
                while not coord.should_stop():
                    _, loss, global_step, result = sess.run([net.optimizer, net.loss, net.global_step, merged])
                    if global_step % 100 == 0:
                        writer.add_summary(result, global_step)
                        print('Step %s loss: %s' % (global_step, loss))
                    if global_step % 1000 == 0:
                        saver.save(sess, os.path.join(weight_dir, 'model.ckpt'), global_step=global_step)
                        print('Save model')
                    if global_step % 2000 == 0:
                        output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
                        with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
                            fd.write(output_graph_def.SerializeToString())
    
    
            except tf.errors.OutOfRangeError as e:
                print('Epochs %s, step %s' % (net.num_epochs, global_step))
            finally:
                coord.request_stop()
            coord.join(threads)
    
    
    def main(argv=None):
        train(PNet(is_training=True, learning_rate=0.0001, num_epochs=300), '../weights/pnet')
        # train(RNet(is_training=True, learning_rate=0.0001, num_epochs=200), '../weights/rnet')
        # train(ONet(is_training=True, learning_rate=0.0001, num_epochs=100), '../weights/onet')
        # print('Not train')
    
    
    if __name__ == '__main__':
        main()
    

    构建数据集代码

    • builddata.py
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import cv2
    import numpy as np
    import tensorflow as tf
    from lib import processbar
    from lib.processbar import ProcessBar
    
    
    data_dir = '../dataset'
    save_dir = '../tfrecords'
    
    
    def bytes_feature(value):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
    
    
    def write_data(fname, lines, im_size):
        writer = tf.python_io.TFRecordWriter(fname)
        bar = ProcessBar('Reading ' + fname.split('/')[-1], len(lines))
        examples = []
        for line in lines:
            parts = line.split()
            im_path = os.path.join(data_dir, parts[0].replace('\', '/'))
            im = cv2.imread(im_path)
            height, width, _ = im.shape
            im = im.astype('uint8')
            # 数据集提供的bbox坐标为(x1,x2,y1,y2), 我们要存储为(x1,y1,x2,y2)    
            bbox = np.array([float(parts[1]), float(parts[3]), float(parts[2]), float(parts[4])], dtype='float32')
            bbox_norm = np.array([bbox[0] / width, bbox[1] / height, bbox[2] / width, bbox[3] / height], dtype='float32')
            landmark = np.array([float(parts[5]), float(parts[6]),
                                float(parts[7]), float(parts[8]),
                                float(parts[9]), float(parts[10]),
                                float(parts[11]), float(parts[12]),
                                float(parts[13]), float(parts[14])], dtype='float32')
    
            landmark_norm = np.array([landmark[0] / width, landmark[1] / height,
                                landmark[2] / width, landmark[3] / height,
                                landmark[4] / width, landmark[5] / height,
                                landmark[6] / width, landmark[7] / height,
                                landmark[8] / width, landmark[9] / height], dtype='float32')
            # 获取Positive样本
            im_crop = cv2.resize(im[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2]), :], (im_size, im_size))
            bbox_pos = np.array([0.0, 0.0, 1.0, 1.0], dtype='float32')
            landmark_offset_pos = landmark_norm.copy()
            landmark_offset_pos[[0, 2, 4, 6, 8]] 
                                    = (landmark_offset_pos[[0, 2, 4, 6, 8]] - bbox_norm[0]) / (bbox_norm[2] - bbox_norm[0])
            landmark_offset_pos[[1, 3, 5, 7, 9]] 
                                    = (landmark_offset_pos[[1, 3, 5, 7, 9]] - bbox_norm[1]) / (bbox_norm[3] - bbox_norm[1])
            example_pos = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_pos.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_pos.tostring()),
                'im_raw':bytes_feature(im_crop.tostring()),
            }))
            examples.append(example_pos)
            
            # 取Negative样本, 分别为图像的左上角与右下角
            # 左上角
            border = [0.0, 0.0, bbox[0], bbox[1]]
            im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
            bbox_neg = bbox_norm.copy()
            bbox_neg[0] = 0.0
            bbox_neg[1] = 0.0
            bbox_neg[2] = 1.0
            bbox_neg[3] = 1.0
            landmark_offset_neg = landmark_norm.copy()
            landmark_offset_neg[0] = 0.0
            landmark_offset_neg[1] = 0.0
            landmark_offset_neg[2] = 0.0
            landmark_offset_neg[3] = 0.0
            landmark_offset_neg[4] = 0.0
            landmark_offset_neg[5] = 0.0
            landmark_offset_neg[6] = 0.0
            landmark_offset_neg[7] = 0.0
            
            example_neg = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
                'im_raw':bytes_feature(im_crop.tostring())
            }))
            examples.append(example_neg)
            # 右下角
            border = [bbox[2], bbox[3], width, height]
            im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
            example_neg = tf.train.Example(features=tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bbox_raw':bytes_feature(bbox_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
                'im_raw':bytes_feature(im_crop.tostring())
            }))
            examples.append(example_neg)
            bar.process()
    
        bar.done()
        bar = ProcessBar('Writing ' + fname.split('/')[-1], len(examples))
        for e in examples:
            writer.write(e.SerializeToString())
            bar.process()
        bar.done()
        writer.close()
    """
    
    def write_data(tfFileName, trainImagesDataDesc, im_size):
        writer = tf.python_io.TFRecordWriter(tfFileName)    
        examples = []
        bar = ProcessBar(tfFileName, len(trainImagesDataDesc))
        for line in trainImagesDataDesc:
            bar.process()
            descs = line.split()
            filePath = descs[0]
            filePath = filePath.replace('\', '/')
            image_file_path = os.path.join(data_dir, filePath)
            im = cv2.imread(image_file_path)
            height, width, _ = im.shape
            im = im.astype('uint8')
            # bb = [float(descs[3]) / h,float(descs[1]) / w, float(descs[4]) / h, float(descs[2]) / w]
            bb = np.array([float(descs[1]),float(descs[3]), float(descs[2]), float(descs[4])], dtype='float32')
            bb_norm = np.array([bb[0] / width, bb[1] / height, bb[2] / width, bb[3] / height], dtype='float32')
            landmark = np.array([float(descs[5]),
                        float(descs[6]),
                        float(descs[7]),
                        float(descs[8]),
                        float(descs[9]),
                        float(descs[10]),
                        float(descs[11]),
                        float(descs[12]),
                        float(descs[13]),
                        float(descs[14])], dtype='float32')
     
            landmark_norm = np.array([float(descs[5])/ width,
                        float(descs[6])/ height,
                        float(descs[7])/ width,
                        float(descs[8])/ height,
                        float(descs[9])/ width,
                        float(descs[10])/ height,
                        float(descs[11])/ width,
                        float(descs[12])/ height,
                        float(descs[13])/ width,
                        float(descs[14])/ height], dtype='float32')
    
            bb_pos = bb_norm.copy()
            bb_pos[0] = 0.0
            bb_pos[1] = 0.0
            bb_pos[2] = 1.0
            bb_pos[3] = 1.0    
    
            im_100 = im[int(bb[1]):int(bb[3]), int(bb[0]):int(bb[2]),:]
            im_100 = cv2.resize(im_100,(im_size,im_size))
            landmark_pos = landmark_norm.copy()
            landmark_pos[[0,2,4,6,8]] = (landmark_pos[[0,2,4,6,8]] - bb_norm[0])/ (bb_norm[2] -bb_norm[0])
            landmark_pos[[1,3,5,7,9]] = (landmark_pos[[1,3,5,7,9]] - bb_norm[1]) / (bb_norm[3] -bb_norm[1])
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()), # 是不是人脸
                'bb_raw':bytes_feature(bb_pos.tostring()), # box
                'landmark_raw':bytes_feature(landmark_pos.tostring()), # 关键点
                'image_raw':bytes_feature(im_100.tostring()) # 图像数据
            }))
            examples.append(example)
            # 再做2个负样本
            # 分别为左上角与右下角, 刚好没有包含人脸, 所以landmark都为0, bbox也为0
            classification = np.array([1,0], dtype='float32')
            cls_raw = classification.tostring()
    
            bb_neg = bb_norm.copy()
            landmark_neg = landmark_norm.copy()
    
            bb_neg[0] = 0.0
            bb_neg[1] = 0.0
            bb_neg[2] = 0.0
            bb_neg[3] = 0.0
            landmark_neg[0] = 0.0
            landmark_neg[1] = 0.0
            landmark_neg[2] = 0.0
            landmark_neg[3] = 0.0
            landmark_neg[4] = 0.0
            landmark_neg[5] = 0.0
            landmark_neg[6] = 0.0
            landmark_neg[7] = 0.0
            # 左上角
            x1_ = 0
            y1_ = 0
            x2_ = int(bb[0])
            y2_ = int(bb[1])
    
            im_crop = im[y1_:y2_, x1_:x2_, :]
            im_ = cv2.resize(im_crop,(im_size,im_size))
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bb_raw':bytes_feature(bb_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_neg.tostring()),
                'image_raw':bytes_feature(im_.tostring())
            }))
            examples.append(example)
            # 右下角
            x1_ = int(bb[2])
            y1_ = int(bb[3])
            x2_ = width
            y2_ = height
            im_crop = im[y1_:y2_, x1_:x2_]
            im_ = cv2.resize(im_crop,(im_size,im_size))
            example = tf.train.Example(features = tf.train.Features(feature={
                'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
                'bb_raw':bytes_feature(bb_neg.tostring()),
                'landmark_raw':bytes_feature(landmark_neg.tostring()),
                'image_raw':bytes_feature(im_.tostring())
            }))
            examples.append(example)
        bar.done() 
        bar = ProcessBar(tfFileName, len(examples))
        for example in examples:
            writer.write(example.SerializeToString())
            bar.process()
        bar.done()
        writer.close()   
    """
    def main(argv=None):
        with open(os.path.join(data_dir, 'trainImageList.txt'), 'r') as fd:
            lines = fd.readlines()
        write_data('../tfrecords/pnet_data.tfrecords', lines, 12)
        write_data('../tfrecords/rnet_data.tfrecords', lines, 24)
        write_data('../tfrecords/onet_data.tfrecords', lines, 48)
        print('Finish writing data')
        
    
    if __name__ == '__main__':
        main()
    

    检测代码

    • mtcnn.py
    
    import os
    import sys
    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    import cv2
    import numpy as np
    import tensorflow as tf
    from lib import util
    
    
    def bbreg(boundingbox, reg):
        bb = boundingbox.copy()    
        w = boundingbox[:, 2] - boundingbox[:, 0] + 1
        h = boundingbox[:, 3] - boundingbox[:, 1] + 1
    
        bb[:,0] = boundingbox[:, 0] + reg[:, 0] * w
        bb[:,1] = boundingbox[:, 1] + reg[:, 1] * h
        bb[:,2] = boundingbox[:, 0] + reg[:, 2] * w
        bb[:,3] = boundingbox[:, 1] + reg[:, 3] * h
        return bb
    
    
    def gen_box(imap, reg, scale, t):
        stride = 2
        cellsize = 12
        imap = np.transpose(imap)
        dx1 = np.transpose(reg[:, :, 0])
        dy1 = np.transpose(reg[:, :, 1])
        dx2 = np.transpose(reg[:, :, 2])
        dy2 = np.transpose(reg[:, :, 3])
        y, x = np.where(imap >= t)
        if y.shape[0] == 1:
            dx1 = np.flipud(dx1)
            dy1 = np.flipud(dy1)
            dx2 = np.flipud(dx2)
            dy2 = np.flipud(dy2)
        score = imap[(y, x)]
        reg = np.transpose(np.vstack([dx1[(y, x)], dy1[(y, x)],
                                      dx2[(y, x)], dy2[(y, x)]]))
        if reg.size == 0:
            reg = np.empty((0, 3))
        bb = np.transpose(np.vstack([y, x]))
        q1 = np.fix((stride * bb + 1) / scale)
        q2 = np.fix((stride * bb + cellsize) / scale)
        boundingbox = np.hstack([q1, q2, np.expand_dims(score, 1), reg])
        return boundingbox, reg
    
    
    def detect():
        im = cv2.imread('./test.jpg')
        im = im.astype('uint8')
        w,h,ch = im.shape
    
        with tf.session() as sess:
            output_graph_def = tf.graphdef()
            with open('../weights/pnet/model.pb', 'rb') as fd:
                output_graph_def.parsefromstring(fd.read())
                tf.import_graph_def(output_graph_def, name='')
                pyramid = []
                ww,hh,ch = im.shape
                minlenght = min(ww,hh)
                impyramid = im.copy()
                minsize = 20
                m = 12 / 20
                m = 1
                scales = []
                factor_counter = 0
                factor = 0.709
                ml = minlenght
                while ml >= 12 :
                    scalefactor = m * np.power(factor, factor_counter)
                    scales.append(scalefactor)
                    ww = w * scalefactor
                    hh = h * scalefactor
                    impyramid = cv2.resize(im,(int(hh),int(ww)))
                    pyramid.append((impyramid.copy() - 127.5) / 128.0)
                    factor_counter += 1
                    scalefactor = m * np.power(factor, factor_counter)
                    ml = minlenght * scalefactor
                boxes_all = np.empty((0,9))
                print("------------pnet-----------------")
                for j in range(len(scales)):
                    
                    scale = scales[j]
                    py = pyramid[j]
                    feed_value = {'pnet/input/shuffle_batch:3':[py]}
                    _cls, _bb,_landmark = sess.run(['pnet/cls_fc/biasadd:0', 'pnet/box_fc/biasadd:0','pnet/landmark_fc/biasadd:0'], feed_value)
                    softmax_ = np.exp(_cls[0,:,:,1]) / (np.exp(_cls[0,:,:,0]) + np.exp(_cls[0,:,:,1]))
                    # 通过gen_box返回boxes在原始输入图像中的映射位置
                    boxes, _ = gen_box(softmax_,#_cls[0, :, :, 1],
                                           _bb[0, :, :, :],
                                           scale,
                                           0.8)
                    if(len(boxes) > 0):
                        pick = util.nms(boxes.copy(), 0.5)
                        boxes_all = np.append( boxes_all ,boxes[pick,:], axis = 0)
    
                print("boxes picked after pnet: " + str(boxes_all.shape))
                
                numbox = boxes_all.shape[0]
                if numbox > 0:
                    pick = util.nms(boxes_all.copy(), 0.7)
                    boxes_all = boxes_all[pick, :]
                    regw = boxes_all[:, 2] - boxes_all[:, 0]
                    regh = boxes_all[:, 3] - boxes_all[:, 1]
                    # 通过bb的回归值对bb对精调
                    qq1 = boxes_all[:, 0] + boxes_all[:, 5] * regw
                    qq2 = boxes_all[:, 1] + boxes_all[:, 6] * regh
                    qq3 = boxes_all[:, 0] + boxes_all[:, 7] * regw
                    qq4 = boxes_all[:, 1] + boxes_all[:, 8] * regh
                    boxes_all = np.transpose(np.vstack([qq1, qq2, qq3, qq4,
                                                        boxes_all[:, 4]]))
                    im_bbr = im.copy()
                    r = 0
                    for box in boxes_all:
                        r += 1
                        cv2.rectangle(im_bbr,
                                    (int(box[0]), int(box[1])),
                                    (int(box[2]), int(box[3])), 
                                    (0,0,255),1)
    
                    print("------------saving p image-----------------")
                    cv2.imwrite( "./test_bbr_p.jpg",im_bbr)      
        
        
        tf.reset_default_graph()
        
       
        with tf.graph().as_default() as graph:   
            with open('../weights/rnet/model.pb', 'rb') as fd:
                output_graph_def = tf.graphdef()
                output_graph_def.parsefromstring(fd.read())
                tf.import_graph_def(output_graph_def, name='') 
        
                with tf.session() as sess:
                    batch_size = 128
                    rnet_images = np.zeros([batch_size, 24, 24, 3])
                    v = 0
                    for box in boxes_all:
                        if v >= batch_size:
                            break
                        x1 = int(box[0])                    
                        y1 = int(box[1])
                        x2 = int(box[2])
                        y2 = int(box[3])
                        rnet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (24, 24))
                        v += 1
                    rnet_images = (rnet_images - 127.5) / 128.0
                    print("------------rnet-----------------")
                    feed_value = {'rnet/input/shuffle_batch:3':rnet_images}
                    cls, bb, landmark = sess.run(['rnet/cls_fc1/biasadd:0', 'rnet/box_fc1/biasadd:0','rnet/landmark_fc1/biasadd:0'], feed_value)
                    prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
                    idx = np.where(prob > 0.8)
                    total_boxes = boxes_all[idx,:][0]
                    ipass_box = np.zeros([len(idx[0]),5])
                    for j in range(len(idx[0])):
                        ip = idx[0][j]
                        ipass_box[j][0:4] = bb[ip][0:4]
                        ipass_box[j][4] = prob[ip]
                    
    
                    pick = util.nms(total_boxes.copy(), 0.7)
                    #输入的源bb坐标
    
                    #用bb的回归量,将输入的bb再次调整
                    total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
    
                    im_bbr = im.copy()
                    
                    r = 0
                    for box in total_boxes:
                        # print("box")
                        # print(box)
                        im_bbr = cv2.rectangle(im_bbr,
                                    (int(box[0]),int(box[1])),
                                    (int(box[2]),int(box[3])), 
                                    (0,255,0),1)
    
                            
    
                        r+=1  
                    print("------------saving rnet image-----------------")
                    cv2.imwrite( "./test_bbr_r" +".jpg",im_bbr)        
    
    
        tf.reset_default_graph()
        with tf.graph().as_default() as graph:   
            with open('../weights/onet/model.pb', "rb") as f:
                output_graph_def = tf.graphdef()
                output_graph_def.parsefromstring(f.read())
                _ = tf.import_graph_def(output_graph_def, name="") 
        
                with tf.session() as sess:
                    batch = 64
                    onet_images = np.zeros((batch,48,48,3))
                    
                    v = 0
                    for box in total_boxes:
                        if(v >= batch):
                            break
                        x1 =  int(box[0])                    
                        y1 =  int(box[1])
                        x2 = int(box[2])
                        y2 = int(box[3])
                        onet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (48, 48))
                        v += 1  
                    onet_images = (onet_images - 127.5) / 128.0
                    feed_value = {'rnet/input/shuffle_batch:3':onet_images}
                    print("------------onet-----------------")
                    cls, bb, landmark = sess.run(['rnet/cls_fc/biasadd:0', 'rnet/box_fc/biasadd:0','rnet/landmark_fc/biasadd:0'], feed_value)  
                    prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
                    idx = np.where(prob > 0.95)
                    total_boxes = total_boxes[idx,:][0]
                    bb = bb[idx,:][0]
                    landmark = landmark[idx,:][0]
    
                    pick = util.nms(total_boxes.copy(), 0.7)
                    total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
    
                    v = 0
                    im_bbr_o = im.copy()
                    for box in total_boxes:
                        im_bbr_o = cv2.rectangle(im_bbr_o,
                                        (int(box[0]),int(box[1])),
                                        (int(box[2]),int(box[3])), 
                                        (0,0,255),1)
                        land = landmark[v]
                        land[[0,2,4,6,8]] = box[0] + land[[0,2,4,6,8]] * (box[2] - box[0])
                        land[[1,3,5,7,9]] = box[1] + land[[1,3,5,7,9]] * (box[3] - box[1])
                        # print(land)
                        for i in range(5):
                            im_bbr_o = cv2.rectangle(im_bbr_o,
                                            (int(land[0 + i * 2]),int(land[1 + i * 2])),
                                            (int(land[0 + i * 2] + 2),int(land[1 + i * 2] + 2)), 
                                            (0,0,0),1)
    
    
                        v += 1
                    cv2.imwrite( "./test_bbr_o"+".jpg",im_bbr_o)   
    
    
    if __name__ == '__main__':
        detect()
    
  • 相关阅读:
    Oracle常用命令大全(很有用,做笔记)
    表格驱动编程在代码中的应用
    mac 利用svn下载远程代码出现Agreeing to the Xcode/iOS license requires admin privileges, please re-run as root via sudo.
    FAILURE: Build failed with an exception.
    There is an internal error in the React performance measurement code.Did not expect componentDidMount timer to start while render timer is still in progress for another instance
    react native TypeError network request failed
    Android向系统相册中插入图片,相册中会出现两张 一样的图片(只是图片大小不一致)
    react-native Unrecognized font family ‘Lonicons’;
    react-native SyntaxError xxxxx/xx.js:Unexpected token (23:24)
    Application MyTest has not been registered. This is either due to a require() error during initialization or failure to call AppRegistry.registerComponent.
  • 原文地址:https://www.cnblogs.com/megachen/p/10748291.html
Copyright © 2011-2022 走看看