MTCNN(Multi-task CNN)
MTCNN难点
- WIDER FACE等数据集为我们提供的图片并不是MTCNN支持的训练样本, 需要通过几个脚本将其转为MTCNN可以接受的数据集, 这些脚本在github中可以找到; 为了增加数据读取的效率, 将数据转为tfrecord格式
制作PNet需要的pos, neg, part数据
-
数据来源: WIDER FACE和Cascade
- Bounding Box的数据由WIDER FACE提供, 因为WIDER FACE没有关键点的数据
- Landmark的数据由Cascade提供, Cascade的数据仅仅是一张人脸, 它给出人脸的Bounding Box和关键点坐标
-
数据生成的步骤
-
变量说明
- w: ground truth的宽
- h: ground truth的高
- 图像的宽
- height: 图像的高
- nx: bounding box的x
- ny: bounding box的y
-
neg数据的生成
- 对每一个WIDER FACE中提供的样本, 我们随机生成大小为(size=np.random(12, min(width, height) / 2))的框, 12是因为在PNet阶段需要接受12x12的图像, 将生成框从原始的图像中截取出来, 我们截取出来的图像至少要大于12, 而框的x和y点确定要保证这个框不会到图像的外边去, 这样到时候resize就是缩放而已; 计算该框与这张图片中所有的ground truth的IoU, 如果(IoUle0.3)就将其该截图归为neg, 在别人写的脚本中, 对于每一张WIDER FACE图像生成50个neg样本(就是一个while循环, 一直判断neg的个数是否小于50, 在while中我们执行上面的操作, 遇到(IoUgt0.3)就continue, 这里我们仅仅关注neg, 不理会其他类型的样本), 但是这样子就固定了每一张图片neg的个数了, 这样不太好, 所以在接下来遍历图像中所有的gt时, 根据单个gt计算偏移量, 但是size还是之前公式那样子随机获得, 计算那5个左右, 这五个不像之前在while循环中那样, 如果(IoUle0.3)就添加到neg样本中, 如果不是就过; 这里要注意的是, neg中的IoU是与该图中所有的比较, 最大的小于0.3; 保存neg数据, 包括img, label=0
-
pos和part数据的生成
- pos和part数据的生成与上面neg数据生成都是在通过for语句中, 所以pos和part数据在neg数据生成之后开始; 一张图片有多个gt, 我们通过for遍历gt, 根据WIDER FACE提供的ground truth, 我们也要向neg数据的生成类似, 生成一个框, 不过这个框生成的方式不一样, 它的大小为(size=npr.random(min(w, h) * 0.8, max(w, h) * 1.25)), 这样就会生成一些比ground truth大一些和小一些的框; 我们的目的是让ground truth与框有重叠部分, 比较我们的框与这个gt的IoU, 如果(IoUgt0.65)则为pos, 如果(IoUgt0.4)则为part, 否则不理会, 我们前面已经处理完毕了neg, 不会再处理neg了; 注意, 保存pos和part的数据, 包括img, label=1, bboxoffset; 其中offset进行了归一化, (nx1-x1)/size, (ny1-y1)/size
-
生成landmark
- 使用Cascade数据集, 该数据集提供了关键点, 也要进行归一化处理, 关键点的归一化依赖于gt的box, (landX-gtX)/w, (landY-gtY)/H
-
混合neg, pos, part, landmark
- PNet的输入是同时包含了neg, pos, part, landmark
- 在混合的时候注意保证neg:pos:part:landmark为3:1:1:1, neg的数据量肯尼个很大, 我们可以定一个计数base, 如果len(neg)>base3, 就去neg中的base3个不然就太过了, 那个其他就是base个了, 对每一个neg, pos, part, landmark, 应该使用npr.choice进行洗牌, 这样pos所有的输入写入, neg, part, landmark所有的都写入, 生成.txt的标签用来读取
-
训练代码
- model.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import numpy as np
import tensorflow as tf
import tensorflow.contrib.slim as slim
from lib import util
class Net(object):
def __init__(self, is_training, learning_rate, num_epochs, im_size):
self.num_epochs = num_epochs
self.learning_rate = learning_rate
self.im_size = im_size
self.X = None
self.Y_cls = None
self.Y_box = None
self.Y_landmark = None
self.cls = None
self.box = None
self.landmark = None
self.build()
if is_training:
self.loss = self.loss_layer()
self.global_step = tf.Variable(1, name='global_step', trainable=False)
self.optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(self.loss, global_step=self.global_step)
def loss_layer(self):
cls_loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self.Y_cls, logits=self.cls)) * self.weights['cls']
box_loss = tf.reduce_mean(tf.square(self.box - self.Y_box)) * self.weights['box']
landmark_loss = tf.reduce_mean(tf.square(self.landmark - self.Y_landmark)) * self.weights['landmark']
return cls_loss + box_loss + landmark_loss
def build(self):
pass
class PNet(Net):
def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=300):
self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
super(PNet, self).__init__(is_training, learning_rate, num_epochs, 12)
def build(self):
with tf.variable_scope('pnet', reuse=False):
with tf.name_scope('input'):
fname_queue = tf.train.string_input_producer(['../tfrecords/pnet_data.tfrecords'], num_epochs=self.num_epochs)
self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
with slim.arg_scope([slim.conv2d], activation_fn=util.prelu,
weights_initializer=slim.xavier_initializer(),
biases_initializer=tf.zeros_initializer(),
weights_regularizer=slim.l2_regularizer(0.0005),
padding='valid'):
net = slim.conv2d(self.X, 10, 3, stride=1, scope='conv_1')
net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_1', padding='SAME')
net = slim.conv2d(net, 16, 3, stride=1, scope='conv_2')
net = slim.conv2d(net, 32, 3, stride=1, scope='conv_3')
cls = slim.conv2d(net, 2, 1, stride=1, scope='cls_fc', activation_fn=None)
cls = tf.reshape(cls, [-1, 2])
box = slim.conv2d(net, 4, 1, stride=1, scope='box_fc', activation_fn=None)
box = tf.reshape(box, [-1, 4])
landmark = slim.conv2d(net, 10, 1, stride=1, scope='landmark_fc', activation_fn=None)
landmark = tf.reshape(landmark, [-1, 10])
self.cls = cls
self.box = box
self.landmark = landmark
util.add_var_to_summary()
class RNet(Net):
def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=200):
self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 0.5}
super(RNet, self).__init__(is_training, learning_rate, num_epochs, 24)
def build(self):
with tf.variable_scope('rnet', reuse=False):
with tf.name_scope('input'):
fname_queue = tf.train.string_input_producer(['../tfrecords/rnet_data.tfrecords'], num_epochs=self.num_epochs)
self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 128)
with slim.arg_scope([slim.conv2d], activation_fn=util.prelu,
weights_initializer=slim.xavier_initializer(),
biases_initializer=tf.zeros_initializer(),
weights_regularizer=slim.l2_regularizer(0.0005),
padding='VALID'):
net = slim.conv2d(self.X, 28, 3, stride=1, scope='conv_1')
net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
net = slim.conv2d(net, 48, 3, stride=1, scope='conv_2')
net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
net = slim.conv2d(net, 64, 2, stride=1, scope='conv_3')
net = slim.flatten(net)
net = slim.fully_connected(net, 128, scope='fc_1')
cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc1')
box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc1')
landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc1')
self.cls = cls
self.box = box
self.landmark = landmark
util.add_var_to_summary()
class ONet(Net):
def __init__(self, is_training=True, learning_rate=0.0001, num_epochs=100):
self.weights = {'cls': 1.0, 'box': 0.5, 'landmark': 1.0}
super(ONet, self).__init__(is_training, learning_rate, num_epochs, 48)
def build(self):
with tf.variable_scope('rnet', reuse=False):
with tf.name_scope('input'):
fname_queue = tf.train.string_input_producer(['../tfrecords/onet_data.tfrecords'], num_epochs=self.num_epochs)
self.X, self.Y_box, self.Y_cls, self.Y_landmark = util.read_tfrecord(fname_queue, self.im_size, 64)
with slim.arg_scope([slim.conv2d], activation_fn=util.prelu,
weights_initializer=slim.xavier_initializer(),
biases_initializer=tf.zeros_initializer(),
weights_regularizer=slim.l2_regularizer(0.0005),
padding='VALID'):
net = slim.conv2d(self.X, 32, 3, stride=1, scope='conv_1')
net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_1', padding='SAME')
net = slim.conv2d(net, 64, 3, stride=1, scope='conv_2')
net = slim.max_pool2d(net, [3, 3], stride=2, scope='pool_2')
net = slim.conv2d(net, 64, 3, stride=1, scope='conv_3')
net = slim.max_pool2d(net, [2, 2], stride=2, scope='pool_3')
net = slim.conv2d(net, 128, [2, 2], stride=1, scope='conv_4')
net = slim.flatten(net)
net = slim.fully_connected(net, 256, scope='fc_1')
cls = slim.fully_connected(net, 2, activation_fn=None, scope='cls_fc')
box = slim.fully_connected(net, 4, activation_fn=None, scope='box_fc')
landmark = slim.fully_connected(net, 10, activation_fn=None, scope='landmark_fc')
self.cls = cls
self.box = box
self.landmark = landmark
util.add_var_to_summary()
def main(argv=None):
pnet = PNet(True)
if __name__ == '__main__':
main()
- util.py(工具类)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import numpy as np
import tensorflow as tf
import tensorflow.contrib.slim as slim
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def prelu(X):
alpha = tf.get_variable('alpha', shape=X.get_shape()[-1], dtype=tf.float32, initializer=tf.constant_initializer(0.25))
pos = tf.nn.relu(X)
neg = alpha * (X - abs(X)) * 0.5
return pos + neg
def read_tfrecord(fname_queue, im_size, batch_size=128):
reader = tf.TFRecordReader()
_, serialized_example = reader.read(fname_queue)
features = tf.parse_single_example(serialized_example, features={
'cls_raw':tf.FixedLenFeature([], tf.string),
'bbox_raw':tf.FixedLenFeature([], tf.string),
'landmark_raw':tf.FixedLenFeature([], tf.string),
'im_raw':tf.FixedLenFeature([], tf.string)
})
im = (tf.cast(tf.decode_raw(features['im_raw'], tf.uint8), tf.float32) - 127.5) / 128.0
im.set_shape([im_size * im_size * 3])
im = tf.reshape(im, [im_size, im_size, 3])
cls = tf.decode_raw(features['cls_raw'], tf.float32)
cls.set_shape([2])
bbox = tf.decode_raw(features['bbox_raw'], tf.float32)
bbox.set_shape([4])
landmark = tf.decode_raw(features['landmark_raw'], tf.float32)
# 从tf.decode_raw返回的矩阵是不知道shape的, 我们需要指定, 使用.set_shape指定
landmark.set_shape([10])
clses, bboxes, landmarks, ims = tf.train.shuffle_batch([cls, bbox, landmark, im],
batch_size=batch_size, capacity=1000 + 3 * batch_size, min_after_dequeue=1000)
return ims, bboxes, clses, landmarks
def add_var_to_summary():
for var in slim.get_model_variables():
tf.summary.histogram(var.op.name, var)
def nms(boxes, threshold):
x1 = boxes[:, 0]
y1 = boxes[:, 1]
x2 = boxes[:, 2]
y2 = boxes[:, 3]
score = boxes[:, 4]
score_sorted = np.argsort(score)
indice = []
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
while score_sorted.size > 0:
i = score_sorted[-1]
indice.append(i)
xx1 = np.maximum(x1[i], x1[score_sorted[0:-1]])
yy1 = np.maximum(y1[i], y1[score_sorted[0:-1]])
xx2 = np.minimum(x2[i], x2[score_sorted[0:-1]])
yy2 = np.minimum(y2[i], y2[score_sorted[0:-1]])
w = np.maximum(0.0, (xx2 - xx1 + 1))
h = np.maximum(0.0, (yy2 - yy1 + 1))
inner = w * h
IoU = inner / (areas[i] + areas[score_sorted[0:-1]] - inner)
score_sorted = score_sorted[np.where(IoU <= threshold)]
return np.asarray(indice)
def main(argv=None):
pass
if __name__ == '__main__':
main()
- train.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import time
sys.path.append((os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
import numpy as np
import tensorflow as tf
import tensorflow.contrib.slim as slim
from tensorflow.python.framework import graph_util
from core import model
from core.model import PNet, RNet, ONet
from lib import util
config = tf.ConfigProto()
config.allow_soft_placement = True
config.gpu_options.allow_growth = True
tmp_dict = {'pnet': ['pnet/cls_fc/BiasAdd', 'pnet/box_fc/BiasAdd', 'pnet/landmark_fc/BiasAdd'],
'rnet': ['rnet/cls_fc1/BiasAdd', 'rnet/box_fc1/BiasAdd', 'rnet/landmark_fc1/BiasAdd'],
'onet': ['rnet/cls_fc/BiasAdd', 'rnet/box_fc/BiasAdd', 'rnet/landmark_fc/BiasAdd']}
def train(net, weight_dir):
saver = tf.train.Saver()
with tf.Session(config=config) as sess:
tf.summary.scalar('Loss', net.loss)
merged = tf.summary.merge_all()
writer = tf.summary.FileWriter('../logger', sess.graph)
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
ckpt = tf.train.get_checkpoint_state(weight_dir)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
time.sleep(1)
output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
fd.write(output_graph_def.SerializeToString())
print('Restore from ' + os.path.join(weight_dir, 'model.ckpt'))
try:
while not coord.should_stop():
_, loss, global_step, result = sess.run([net.optimizer, net.loss, net.global_step, merged])
if global_step % 100 == 0:
writer.add_summary(result, global_step)
print('Step %s loss: %s' % (global_step, loss))
if global_step % 1000 == 0:
saver.save(sess, os.path.join(weight_dir, 'model.ckpt'), global_step=global_step)
print('Save model')
if global_step % 2000 == 0:
output_graph_def = graph_util.convert_variables_to_constants(sess, sess.graph_def, tmp_dict['onet'])
with tf.gfile.GFile(os.path.join(weight_dir, 'model.pb'), 'wb') as fd:
fd.write(output_graph_def.SerializeToString())
except tf.errors.OutOfRangeError as e:
print('Epochs %s, step %s' % (net.num_epochs, global_step))
finally:
coord.request_stop()
coord.join(threads)
def main(argv=None):
train(PNet(is_training=True, learning_rate=0.0001, num_epochs=300), '../weights/pnet')
# train(RNet(is_training=True, learning_rate=0.0001, num_epochs=200), '../weights/rnet')
# train(ONet(is_training=True, learning_rate=0.0001, num_epochs=100), '../weights/onet')
# print('Not train')
if __name__ == '__main__':
main()
构建数据集代码
- builddata.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import cv2
import numpy as np
import tensorflow as tf
from lib import processbar
from lib.processbar import ProcessBar
data_dir = '../dataset'
save_dir = '../tfrecords'
def bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def write_data(fname, lines, im_size):
writer = tf.python_io.TFRecordWriter(fname)
bar = ProcessBar('Reading ' + fname.split('/')[-1], len(lines))
examples = []
for line in lines:
parts = line.split()
im_path = os.path.join(data_dir, parts[0].replace('\', '/'))
im = cv2.imread(im_path)
height, width, _ = im.shape
im = im.astype('uint8')
# 数据集提供的bbox坐标为(x1,x2,y1,y2), 我们要存储为(x1,y1,x2,y2)
bbox = np.array([float(parts[1]), float(parts[3]), float(parts[2]), float(parts[4])], dtype='float32')
bbox_norm = np.array([bbox[0] / width, bbox[1] / height, bbox[2] / width, bbox[3] / height], dtype='float32')
landmark = np.array([float(parts[5]), float(parts[6]),
float(parts[7]), float(parts[8]),
float(parts[9]), float(parts[10]),
float(parts[11]), float(parts[12]),
float(parts[13]), float(parts[14])], dtype='float32')
landmark_norm = np.array([landmark[0] / width, landmark[1] / height,
landmark[2] / width, landmark[3] / height,
landmark[4] / width, landmark[5] / height,
landmark[6] / width, landmark[7] / height,
landmark[8] / width, landmark[9] / height], dtype='float32')
# 获取Positive样本
im_crop = cv2.resize(im[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2]), :], (im_size, im_size))
bbox_pos = np.array([0.0, 0.0, 1.0, 1.0], dtype='float32')
landmark_offset_pos = landmark_norm.copy()
landmark_offset_pos[[0, 2, 4, 6, 8]]
= (landmark_offset_pos[[0, 2, 4, 6, 8]] - bbox_norm[0]) / (bbox_norm[2] - bbox_norm[0])
landmark_offset_pos[[1, 3, 5, 7, 9]]
= (landmark_offset_pos[[1, 3, 5, 7, 9]] - bbox_norm[1]) / (bbox_norm[3] - bbox_norm[1])
example_pos = tf.train.Example(features=tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()),
'bbox_raw':bytes_feature(bbox_pos.tostring()),
'landmark_raw':bytes_feature(landmark_offset_pos.tostring()),
'im_raw':bytes_feature(im_crop.tostring()),
}))
examples.append(example_pos)
# 取Negative样本, 分别为图像的左上角与右下角
# 左上角
border = [0.0, 0.0, bbox[0], bbox[1]]
im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
bbox_neg = bbox_norm.copy()
bbox_neg[0] = 0.0
bbox_neg[1] = 0.0
bbox_neg[2] = 1.0
bbox_neg[3] = 1.0
landmark_offset_neg = landmark_norm.copy()
landmark_offset_neg[0] = 0.0
landmark_offset_neg[1] = 0.0
landmark_offset_neg[2] = 0.0
landmark_offset_neg[3] = 0.0
landmark_offset_neg[4] = 0.0
landmark_offset_neg[5] = 0.0
landmark_offset_neg[6] = 0.0
landmark_offset_neg[7] = 0.0
example_neg = tf.train.Example(features=tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
'bbox_raw':bytes_feature(bbox_neg.tostring()),
'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
'im_raw':bytes_feature(im_crop.tostring())
}))
examples.append(example_neg)
# 右下角
border = [bbox[2], bbox[3], width, height]
im_crop = cv2.resize(im[int(border[1]):int(border[3]), int(border[0]):int(border[2]), :], (im_size, im_size))
example_neg = tf.train.Example(features=tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
'bbox_raw':bytes_feature(bbox_neg.tostring()),
'landmark_raw':bytes_feature(landmark_offset_neg.tostring()),
'im_raw':bytes_feature(im_crop.tostring())
}))
examples.append(example_neg)
bar.process()
bar.done()
bar = ProcessBar('Writing ' + fname.split('/')[-1], len(examples))
for e in examples:
writer.write(e.SerializeToString())
bar.process()
bar.done()
writer.close()
"""
def write_data(tfFileName, trainImagesDataDesc, im_size):
writer = tf.python_io.TFRecordWriter(tfFileName)
examples = []
bar = ProcessBar(tfFileName, len(trainImagesDataDesc))
for line in trainImagesDataDesc:
bar.process()
descs = line.split()
filePath = descs[0]
filePath = filePath.replace('\', '/')
image_file_path = os.path.join(data_dir, filePath)
im = cv2.imread(image_file_path)
height, width, _ = im.shape
im = im.astype('uint8')
# bb = [float(descs[3]) / h,float(descs[1]) / w, float(descs[4]) / h, float(descs[2]) / w]
bb = np.array([float(descs[1]),float(descs[3]), float(descs[2]), float(descs[4])], dtype='float32')
bb_norm = np.array([bb[0] / width, bb[1] / height, bb[2] / width, bb[3] / height], dtype='float32')
landmark = np.array([float(descs[5]),
float(descs[6]),
float(descs[7]),
float(descs[8]),
float(descs[9]),
float(descs[10]),
float(descs[11]),
float(descs[12]),
float(descs[13]),
float(descs[14])], dtype='float32')
landmark_norm = np.array([float(descs[5])/ width,
float(descs[6])/ height,
float(descs[7])/ width,
float(descs[8])/ height,
float(descs[9])/ width,
float(descs[10])/ height,
float(descs[11])/ width,
float(descs[12])/ height,
float(descs[13])/ width,
float(descs[14])/ height], dtype='float32')
bb_pos = bb_norm.copy()
bb_pos[0] = 0.0
bb_pos[1] = 0.0
bb_pos[2] = 1.0
bb_pos[3] = 1.0
im_100 = im[int(bb[1]):int(bb[3]), int(bb[0]):int(bb[2]),:]
im_100 = cv2.resize(im_100,(im_size,im_size))
landmark_pos = landmark_norm.copy()
landmark_pos[[0,2,4,6,8]] = (landmark_pos[[0,2,4,6,8]] - bb_norm[0])/ (bb_norm[2] -bb_norm[0])
landmark_pos[[1,3,5,7,9]] = (landmark_pos[[1,3,5,7,9]] - bb_norm[1]) / (bb_norm[3] -bb_norm[1])
example = tf.train.Example(features = tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([0, 1], dtype='float32').tostring()), # 是不是人脸
'bb_raw':bytes_feature(bb_pos.tostring()), # box
'landmark_raw':bytes_feature(landmark_pos.tostring()), # 关键点
'image_raw':bytes_feature(im_100.tostring()) # 图像数据
}))
examples.append(example)
# 再做2个负样本
# 分别为左上角与右下角, 刚好没有包含人脸, 所以landmark都为0, bbox也为0
classification = np.array([1,0], dtype='float32')
cls_raw = classification.tostring()
bb_neg = bb_norm.copy()
landmark_neg = landmark_norm.copy()
bb_neg[0] = 0.0
bb_neg[1] = 0.0
bb_neg[2] = 0.0
bb_neg[3] = 0.0
landmark_neg[0] = 0.0
landmark_neg[1] = 0.0
landmark_neg[2] = 0.0
landmark_neg[3] = 0.0
landmark_neg[4] = 0.0
landmark_neg[5] = 0.0
landmark_neg[6] = 0.0
landmark_neg[7] = 0.0
# 左上角
x1_ = 0
y1_ = 0
x2_ = int(bb[0])
y2_ = int(bb[1])
im_crop = im[y1_:y2_, x1_:x2_, :]
im_ = cv2.resize(im_crop,(im_size,im_size))
example = tf.train.Example(features = tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
'bb_raw':bytes_feature(bb_neg.tostring()),
'landmark_raw':bytes_feature(landmark_neg.tostring()),
'image_raw':bytes_feature(im_.tostring())
}))
examples.append(example)
# 右下角
x1_ = int(bb[2])
y1_ = int(bb[3])
x2_ = width
y2_ = height
im_crop = im[y1_:y2_, x1_:x2_]
im_ = cv2.resize(im_crop,(im_size,im_size))
example = tf.train.Example(features = tf.train.Features(feature={
'cls_raw':bytes_feature(np.array([1, 0], dtype='float32').tostring()),
'bb_raw':bytes_feature(bb_neg.tostring()),
'landmark_raw':bytes_feature(landmark_neg.tostring()),
'image_raw':bytes_feature(im_.tostring())
}))
examples.append(example)
bar.done()
bar = ProcessBar(tfFileName, len(examples))
for example in examples:
writer.write(example.SerializeToString())
bar.process()
bar.done()
writer.close()
"""
def main(argv=None):
with open(os.path.join(data_dir, 'trainImageList.txt'), 'r') as fd:
lines = fd.readlines()
write_data('../tfrecords/pnet_data.tfrecords', lines, 12)
write_data('../tfrecords/rnet_data.tfrecords', lines, 24)
write_data('../tfrecords/onet_data.tfrecords', lines, 48)
print('Finish writing data')
if __name__ == '__main__':
main()
检测代码
- mtcnn.py
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import cv2
import numpy as np
import tensorflow as tf
from lib import util
def bbreg(boundingbox, reg):
bb = boundingbox.copy()
w = boundingbox[:, 2] - boundingbox[:, 0] + 1
h = boundingbox[:, 3] - boundingbox[:, 1] + 1
bb[:,0] = boundingbox[:, 0] + reg[:, 0] * w
bb[:,1] = boundingbox[:, 1] + reg[:, 1] * h
bb[:,2] = boundingbox[:, 0] + reg[:, 2] * w
bb[:,3] = boundingbox[:, 1] + reg[:, 3] * h
return bb
def gen_box(imap, reg, scale, t):
stride = 2
cellsize = 12
imap = np.transpose(imap)
dx1 = np.transpose(reg[:, :, 0])
dy1 = np.transpose(reg[:, :, 1])
dx2 = np.transpose(reg[:, :, 2])
dy2 = np.transpose(reg[:, :, 3])
y, x = np.where(imap >= t)
if y.shape[0] == 1:
dx1 = np.flipud(dx1)
dy1 = np.flipud(dy1)
dx2 = np.flipud(dx2)
dy2 = np.flipud(dy2)
score = imap[(y, x)]
reg = np.transpose(np.vstack([dx1[(y, x)], dy1[(y, x)],
dx2[(y, x)], dy2[(y, x)]]))
if reg.size == 0:
reg = np.empty((0, 3))
bb = np.transpose(np.vstack([y, x]))
q1 = np.fix((stride * bb + 1) / scale)
q2 = np.fix((stride * bb + cellsize) / scale)
boundingbox = np.hstack([q1, q2, np.expand_dims(score, 1), reg])
return boundingbox, reg
def detect():
im = cv2.imread('./test.jpg')
im = im.astype('uint8')
w,h,ch = im.shape
with tf.session() as sess:
output_graph_def = tf.graphdef()
with open('../weights/pnet/model.pb', 'rb') as fd:
output_graph_def.parsefromstring(fd.read())
tf.import_graph_def(output_graph_def, name='')
pyramid = []
ww,hh,ch = im.shape
minlenght = min(ww,hh)
impyramid = im.copy()
minsize = 20
m = 12 / 20
m = 1
scales = []
factor_counter = 0
factor = 0.709
ml = minlenght
while ml >= 12 :
scalefactor = m * np.power(factor, factor_counter)
scales.append(scalefactor)
ww = w * scalefactor
hh = h * scalefactor
impyramid = cv2.resize(im,(int(hh),int(ww)))
pyramid.append((impyramid.copy() - 127.5) / 128.0)
factor_counter += 1
scalefactor = m * np.power(factor, factor_counter)
ml = minlenght * scalefactor
boxes_all = np.empty((0,9))
print("------------pnet-----------------")
for j in range(len(scales)):
scale = scales[j]
py = pyramid[j]
feed_value = {'pnet/input/shuffle_batch:3':[py]}
_cls, _bb,_landmark = sess.run(['pnet/cls_fc/biasadd:0', 'pnet/box_fc/biasadd:0','pnet/landmark_fc/biasadd:0'], feed_value)
softmax_ = np.exp(_cls[0,:,:,1]) / (np.exp(_cls[0,:,:,0]) + np.exp(_cls[0,:,:,1]))
# 通过gen_box返回boxes在原始输入图像中的映射位置
boxes, _ = gen_box(softmax_,#_cls[0, :, :, 1],
_bb[0, :, :, :],
scale,
0.8)
if(len(boxes) > 0):
pick = util.nms(boxes.copy(), 0.5)
boxes_all = np.append( boxes_all ,boxes[pick,:], axis = 0)
print("boxes picked after pnet: " + str(boxes_all.shape))
numbox = boxes_all.shape[0]
if numbox > 0:
pick = util.nms(boxes_all.copy(), 0.7)
boxes_all = boxes_all[pick, :]
regw = boxes_all[:, 2] - boxes_all[:, 0]
regh = boxes_all[:, 3] - boxes_all[:, 1]
# 通过bb的回归值对bb对精调
qq1 = boxes_all[:, 0] + boxes_all[:, 5] * regw
qq2 = boxes_all[:, 1] + boxes_all[:, 6] * regh
qq3 = boxes_all[:, 0] + boxes_all[:, 7] * regw
qq4 = boxes_all[:, 1] + boxes_all[:, 8] * regh
boxes_all = np.transpose(np.vstack([qq1, qq2, qq3, qq4,
boxes_all[:, 4]]))
im_bbr = im.copy()
r = 0
for box in boxes_all:
r += 1
cv2.rectangle(im_bbr,
(int(box[0]), int(box[1])),
(int(box[2]), int(box[3])),
(0,0,255),1)
print("------------saving p image-----------------")
cv2.imwrite( "./test_bbr_p.jpg",im_bbr)
tf.reset_default_graph()
with tf.graph().as_default() as graph:
with open('../weights/rnet/model.pb', 'rb') as fd:
output_graph_def = tf.graphdef()
output_graph_def.parsefromstring(fd.read())
tf.import_graph_def(output_graph_def, name='')
with tf.session() as sess:
batch_size = 128
rnet_images = np.zeros([batch_size, 24, 24, 3])
v = 0
for box in boxes_all:
if v >= batch_size:
break
x1 = int(box[0])
y1 = int(box[1])
x2 = int(box[2])
y2 = int(box[3])
rnet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (24, 24))
v += 1
rnet_images = (rnet_images - 127.5) / 128.0
print("------------rnet-----------------")
feed_value = {'rnet/input/shuffle_batch:3':rnet_images}
cls, bb, landmark = sess.run(['rnet/cls_fc1/biasadd:0', 'rnet/box_fc1/biasadd:0','rnet/landmark_fc1/biasadd:0'], feed_value)
prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
idx = np.where(prob > 0.8)
total_boxes = boxes_all[idx,:][0]
ipass_box = np.zeros([len(idx[0]),5])
for j in range(len(idx[0])):
ip = idx[0][j]
ipass_box[j][0:4] = bb[ip][0:4]
ipass_box[j][4] = prob[ip]
pick = util.nms(total_boxes.copy(), 0.7)
#输入的源bb坐标
#用bb的回归量,将输入的bb再次调整
total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
im_bbr = im.copy()
r = 0
for box in total_boxes:
# print("box")
# print(box)
im_bbr = cv2.rectangle(im_bbr,
(int(box[0]),int(box[1])),
(int(box[2]),int(box[3])),
(0,255,0),1)
r+=1
print("------------saving rnet image-----------------")
cv2.imwrite( "./test_bbr_r" +".jpg",im_bbr)
tf.reset_default_graph()
with tf.graph().as_default() as graph:
with open('../weights/onet/model.pb', "rb") as f:
output_graph_def = tf.graphdef()
output_graph_def.parsefromstring(f.read())
_ = tf.import_graph_def(output_graph_def, name="")
with tf.session() as sess:
batch = 64
onet_images = np.zeros((batch,48,48,3))
v = 0
for box in total_boxes:
if(v >= batch):
break
x1 = int(box[0])
y1 = int(box[1])
x2 = int(box[2])
y2 = int(box[3])
onet_images[v, :, :, :] = cv2.resize(im[y1:y2, x1:x2, :], (48, 48))
v += 1
onet_images = (onet_images - 127.5) / 128.0
feed_value = {'rnet/input/shuffle_batch:3':onet_images}
print("------------onet-----------------")
cls, bb, landmark = sess.run(['rnet/cls_fc/biasadd:0', 'rnet/box_fc/biasadd:0','rnet/landmark_fc/biasadd:0'], feed_value)
prob = np.exp(cls[:,1]) / (np.exp(cls[:,0]) + np.exp(cls[:,1]))
idx = np.where(prob > 0.95)
total_boxes = total_boxes[idx,:][0]
bb = bb[idx,:][0]
landmark = landmark[idx,:][0]
pick = util.nms(total_boxes.copy(), 0.7)
total_boxes = bbreg(total_boxes[pick,:].copy(), ipass_box[pick,:])
v = 0
im_bbr_o = im.copy()
for box in total_boxes:
im_bbr_o = cv2.rectangle(im_bbr_o,
(int(box[0]),int(box[1])),
(int(box[2]),int(box[3])),
(0,0,255),1)
land = landmark[v]
land[[0,2,4,6,8]] = box[0] + land[[0,2,4,6,8]] * (box[2] - box[0])
land[[1,3,5,7,9]] = box[1] + land[[1,3,5,7,9]] * (box[3] - box[1])
# print(land)
for i in range(5):
im_bbr_o = cv2.rectangle(im_bbr_o,
(int(land[0 + i * 2]),int(land[1 + i * 2])),
(int(land[0 + i * 2] + 2),int(land[1 + i * 2] + 2)),
(0,0,0),1)
v += 1
cv2.imwrite( "./test_bbr_o"+".jpg",im_bbr_o)
if __name__ == '__main__':
detect()