zoukankan      html  css  js  c++  java
  • 基于MTCNN算法的人脸检测

    论文:《Joint Face Detection and Alignment using Multi-task Cascaded Convolutional Networks》

    论文网址:https://arxiv.org/abs/1604.02878v1

    一、总体框架

    MTCNN通过不同的卷积神经网络,实现对人脸的识别以及人脸关键点检测。总的框架如下:

                                                                        图1 Pipeline

    如图1所示为MTCNN的整体框架(检测实现流程—测试流程)。

    给定一张图片,需要将其resize成不同大小的图片,建立图像金字塔。这些不同size的图片是下面三个stage的输入。

    stage1:首先使用全卷积网络(P-Net)获取候选框和他们的回归向量。然后使用估计的bounding box回归向量去标定候选框。然后再使用非极大值抑制(NMS)去合并高度重叠的候选框;

    stage2:这一层使用一个提炼网络(Refine Network, R-Net)。所有stage1中的候选框传入R-Net,使用边界框回归(bounding box regression)以及NMS,使得消除更多假的候选框(false candidates);

    stage3:stage3使用一个输出网络(O-Net),该阶段与stage2相似。但是在这个阶段,我们的目标是更加详细地描述人脸。尤其是该网络将要输出5个人脸标记位置(facial landmarks' positions)。

    二、CNN结构:

     许多的论文都设计CNN用于人脸检测。但是,这些论文都受到以下几个原因的限制:

    1)许多filter缺乏权重多样性限制了他们产生有判别力的描述。

    2)相比于其他的多分类物体检测和分类任务,人脸检测是一个具有挑战性的二分类任务,因此可能需要更少数量的filters但是需要对人脸更具有辨别力。为了这个目的,我们减少filter的数量,将5×5的filter变成3×3的filter,减少计算量,尽管增加filter的深度能够获取到更好的性能。通过这些改进,我们可以得到更好的性能,但是运行时间变少。CNN的结构如图2所示。

                                            图2 CNN的结构(MP-max polling,Conv—convolution, 卷积和池化的step分别为1和2)

    三、训练

    使用三个tasks训练CNN detector,分别为:人脸/非人脸分类,边界框回归以及人脸标记定位。

    1)人脸分类

    学习目标可以表述为二分类任务。对于每个样本$x_{i}$,我们使用交叉熵损失函数(cross-entropy loss):

    $L_{i}^{det}=-(y_{i}^{det}log(p_{i})+(1-y_{i}^{det})(1-log(p_{i})))$    (1)

    其中,$p_{i}$是神经网络输出的概率,表示了一个样本是人脸的概率。$y_{i}^{det}in left {0,  1 ight }$,表示ground truth的标签。

    2)边界框回归

    对于每一个候选框,我们需要预测它与最近的ground truth的偏移,包括:左上坐标、高度和宽度。学习目标可以表述为回归问题,对于每一个样本$x_{i}$,我们使用欧式损失(Euclidean loss):

    $L_{i}^{box}=left | hat{y}_{i}^{box} - y_{i}^{box} ight |_{2}^{2}$    (2)

    其中,$hat{y}_{i}^{box}$回归目标是从神经网络获得的(即网络的输出),$y_{i}^{box}$是ground truth。有4个坐标,包括:左上、高度和宽度,因此$y_{i}^{box}in mathbb{R}^{4}$。

    3)人脸标记定位

    和边界框回归任务相似,人脸标记定位可以表示为回归任务问题,使用最小化欧式损失:

    $L_{i}^{landmark}=left | hat{y}_{i}^{landmark} - y_{i}^{landmark} ight |_{2}^{2}$   (3)

    其中,$hat{y}_{i}^{landmark}$是从神经网络输出获得的人脸标记坐标,$ y_{i}^{landmark}$是ground truth。因为有5个人脸标记,包括:左眼睛、右眼睛、鼻子、嘴巴左边界和嘴巴右边界,因此$y_{i}^{landmark}in mathbb{R}^{10}$。

    4)多数据源训练

    因为我们在不同的CNN中执行不同的任务,所以在训练过程中,使用不同类型的训练图像数据,例如:人脸、非人脸和部分人脸数据。所以,一些损失函数(1-3公式)不会使用。例如,对于背景区域,我们仅仅计算$L_{i}^{det}$,另外两个损失设置为0,这个可以使用采样类型指示器实现。总的学习目标可以表示为:

    $minsum _{i=1}^{N}sum _{jin det,box,landmark}alpha _{j}eta _{i}^{j}L_{i}^{j}$    (4)

    其中,$N$为训练样本的数量。$alpha_{j}$表示人物的重要性(分别设置为:P-Net和R-Net中,$alpha_{det}=1,alpha_{box}=0.5,alpha_{landmark}=0.5$;O-Net为了获得更加精确的人脸标记点定位,在O-Net中,$alpha_{det}=1,alpha_{box}=0.5,alpha_{landmark}=1$)。$eta _{i}^{j}in left { 0,1 ight }$为采样类型指示器。使用随机梯度下降算法(SGD)训练CNNs。

    5)在线困难样本挖掘

     不同于在原始分类训练完成之后执行传统的困难样本挖掘,我们采用在线困难样挖掘适应训练过程。

    特别地,我们对前向传播过程计算出的损失进行分类,然后只采用其中的70%作为困难样本。然后我们在后向传播过程中,只计算困难样本的梯度。这也就意味着我们忽略简单样本,这些简单样本对增强训练过程的探测功能不太有帮助。

    6)训练数据

    因为我们联合执行人脸检测和人脸对齐,因此我们在训练过程中使用四种不同的数据标记。分别为:

    6.1负样本:与图片中任何一个ground truth的IOU小于0.3的区域;

    6.2正样本:与图片中任何一个ground truth的IOU大于0.65的区域;

    6.3部分人脸:IOU介于0.4和0.65之间;

    6.4标记人脸:标记5个人脸标记位置的图片;

    其中,负样本和正样本用于人脸分类任务(即判别是人脸还是非人脸);正样本和部分人脸用于边界框回归;人脸编辑样本用于人脸标记定位。每一个网络的训练数据可以如下表示:

    ①P-Net:从WIDER FACE数据集中随机裁剪获取正样本、负样本和部分人脸样本。然后,从CelebA数据裁剪人脸标记数据,需要resize成12×12;

    ②R-Net:将框架第一阶段的输出的proposal作为R-Net的输入,需要resize成24×24;

    ③O-Net :输入是经过第二步筛选和refine过的人脸框,同样从原图抠出后统一resize到48*48,成批输入ONet。

    后面阶段都是在前面阶段的基础上对训练结果进行调整。

    四、测试阶段

    如第一节的总体架构,首先使图像生成图像金字塔,生成多尺度的图像,然后输入P-Net(因为P-Net是全卷积网络,该网络的输出的featuremap上的每一个特征点都对应于输入图像上的12×12的区域,因此)。PNet由于尺寸很小,所以可以很快的选出候选区域,但是准确率不高,不同尺度上的判断出来的人脸检测框,然后采用NMS算法,合并候选框,然后根据候选框提取图像,之后缩放到24*24的大小,作为RNet的输入,RNet可以精确的选取边框,一般最后只剩几个边框,最后缩放到48*48的大小,输入ONet,判断后选框是不是人脸,ONet虽然速度较慢,但是由于经过前两个网络,已经得到了高概率的边框,所以输入ONet的图像较少,然后ONet输出精确的边框和关键点信息,只是在第三个阶段上才显示人脸特征定位;前两个阶段只是分类,不显示人脸定点的结果。

    参考:https://blog.csdn.net/wfei101/article/details/79935037

    五、项目实践

    参考项目地址:GitHub

    根据参考项目做一些调整,模型实现。

    数据集下载:

    这里使用的数据集是WIDER FACE以及CelebA

    代码讲解如下:

    参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

    主要代码理解如下:

    生成P-Net数据:

    gen_12net_data.py

    # coding: utf-8
    
    """
    截取pos,neg,part三种类型图片并resize成12x12大小作为PNet的输入
    """
    import os
    import cv2
    import numpy as np
    npr = np.random
    from tqdm import  tqdm
    from utils import IOU 
    
    # face的id对应label的txt
    anno_file = '../data/wider_face_train.txt'
    # 图片地址
    im_dir = '../data/WIDER_train/images'
    # pos,part,neg裁剪图片放置位置
    pos_save_dir = '../data/12/positive'
    part_save_dir = '../data/12/part'
    neg_save_dir = '../data/12/negative'
    # PNet数据地址
    save_dir = '../data/12'
    
    if not os.path.exists(save_dir):
        os.mkdir(save_dir)
    if not os.path.exists(pos_save_dir):
        os.mkdir(pos_save_dir)
    if not os.path.exists(part_save_dir):
        os.mkdir(part_save_dir)
    if not os.path.exists(neg_save_dir):
        os.mkdir(neg_save_dir)
        
    f1 = open(os.path.join(save_dir, 'pos_12.txt'), 'w')
    f2 = open(os.path.join(save_dir, 'neg_12.txt'), 'w')
    f3 = open(os.path.join(save_dir, 'part_12.txt'), 'w')
    
    with open(anno_file, 'r') as f:
        annotations = f.readlines()
    num = len(annotations)
    print('总共的图片数: %d' % num)
    # 记录pos, neg, part三类生成数
    p_idx = 0
    n_idx = 0
    d_idx = 0
    # 记录读取图片数
    idx = 0
    for annotation in tqdm(annotations):  # 进度条显示
        annotation = annotation.strip().split(' ')
        im_path = annotation[0]
        box = list(map(float, annotation[1:]))
        boxes = np.array(box, dtype=np.float32).reshape(-1, 4)  # numpy.array.reshape -> 4列, 每一行是box
        
        img = cv2.imread(os.path.join(im_dir, im_path+'.jpg'))
        idx += 1
        height, width, channel = img.shape
        
        neg_num = 0
        # 先采样一定数量neg图片
        while neg_num < 50:
            # 随机选取截取图像大小
            size = npr.randint(12, min(width, height)/2)
            # 随机选取左上坐标
            nx = npr.randint(0, width-size)
            ny = npr.randint(0, height-size)
            # 截取box
            crop_box = np.array([nx, ny, nx+size, ny+size])
            # 计算iou值
            Iou = IOU(crop_box, boxes)
            # 截取图片并resize成12x12大小
            cropped_im = img[ny:ny+size, nx:nx+size, :]  # cv2.imread读取的图片第一维度是y
            resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)  # P-Net的训练输入图像大小为12 × 12
    
            # iou值小于0.3判定为neg图像
            if np.max(Iou) < 0.3:
                save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)  # neg的图片的绝对路径
                f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0
    ')  # neg_12.txt文件保存neg的图片的绝对路径
                cv2.imwrite(save_file, resized_im)  # 将截取的图片保存
                n_idx += 1
                neg_num += 1
        
        for box in boxes:  # 以每个box为基础选取截图
            # 左上右下坐标
            x1, y1, x2, y2 = box
            w = x2 - x1 + 1
            h = y2 - y1 + 1
            # 舍去图像过小和box在图片外的图像
            if max(w, h) < 20 or x1 < 0 or y1 < 0:
                continue
            for i in range(5):  # 每个box附近截取5个截图用于判断是否为negative训练样本
                size = npr.randint(12, min(width, height)/2)
    
                # 随机生成的关于x1, y1的偏移量,并且保证x1+delta_x>0,y1+delta_y>0
                delta_x = npr.randint(max(-size, -x1), w)
                delta_y = npr.randint(max(-size, -y1), h)
                # 截取后的左上角坐标
                # 这里面是获取negative的截图, 所以可以(最好是)随意选取, 因此左上角坐标和偏移量都是随意选取的.
                nx1 = int(max(0, x1+delta_x))
                ny1 = int(max(0, y1+delta_y))
                # 排除大于图片尺度的
                if nx1 + size > width or ny1 + size > height:
                    continue
                crop_box = np.array([nx1, ny1, nx1+size, ny1+size])
                Iou = IOU(crop_box, boxes)
                cropped_im = img[ny1:ny1+size, nx1:nx1+size, :]
                resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
                
                if np.max(Iou) < 0.3:
                    save_file = os.path.join(neg_save_dir, '%s.jpg' %n_idx)
                    f2.write(neg_save_dir+'/%s.jpg'%n_idx+' 0
    ')
                    cv2.imwrite(save_file, resized_im)
                    n_idx += 1
            for i in range(20):  # 每个box附近截取20个截图用于判断是否为positive或者是part训练样本
                # 这里是截取positive和part图片, 目的是需要截取box附近的图片, 因此下面size的大小也需要接近w, h. 不然取不到positive、part的几率大.
                size = npr.randint(int(min(w, h)*0.8), np.ceil(1.25*max(w, h)))
    
                # 除去尺度小的box
                # 注意:w, h是box的尺寸. width、height是整个训练图片的尺寸.
                if w < 5:
                    continue
                # 在box附近截取图片, 偏移量取值, 稍微小一点好.
                delta_x = npr.randint(-w*0.2, w*0.2)
                delta_y = npr.randint(-h*0.2, h*0.2)
                # 截取图像左上坐标计算是先计算x1+w/2表示的中心坐标,再+delta_x偏移量,再-size/2,
                nx1 = int(max(x1+w/2+delta_x-size/2, 0))
                ny1 = int(max(y1+h/2+delta_y-size/2, 0))
                nx2 = nx1 + size
                ny2 = ny1 + size
                
                # 排除超出的图像
                if nx2 > width or ny2 > height:
                    continue
                crop_box = np.array([nx1, ny1, nx2, ny2])
                # 人脸框相对于截取图片的偏移量并做归一化处理
                # 这里训练数据使用相对于人脸框归一化处理的offset, 实际测试时得到的也是归一化的offset. 因此训练就是获取归一化的offset.
                offset_x1 = (x1-nx1)/float(size)
                offset_y1 = (y1-ny1)/float(size)
                offset_x2 = (x2-nx2)/float(size)
                offset_y2 = (y2-ny2)/float(size)
                
                cropped_im = img[ny1:ny2, nx1:nx2, :]
                resized_im = cv2.resize(cropped_im, (12, 12), interpolation=cv2.INTER_LINEAR)
                # box扩充一个维度作为iou输入
                box_ = box.reshape(1, -1)  # 这里是每一个box, 对每一个box和截取的图像进行IOU计算
                iou = IOU(crop_box, box_)
                if iou >= 0.65:
                    save_file = os.path.join(pos_save_dir, '%s.jpg'%p_idx)
                    f1.write(pos_save_dir+'/%s.jpg'%p_idx+' 1 %.2f %.2f %.2f %.2f
    '%(offset_x1,
                             offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    p_idx += 1
                elif iou >= 0.4:
                    save_file = os.path.join(part_save_dir, '%s.jpg'%d_idx)
                    f3.write(part_save_dir+'/%s.jpg'%d_idx+' -1 %.2f %.2f %.2f %.2f
    '%(offset_x1,
                             offset_y1, offset_x2, offset_y2))
                    cv2.imwrite(save_file, resized_im)
                    d_idx += 1
    
    print('%s 个图片已处理,pos:%s  part: %s neg:%s' %(idx, p_idx, d_idx, n_idx))
    f1.close()
    f2.close()
    f3.close()
    View Code

    生成landmark数据 :

    gen_landmark_aug.py

    # coding: utf-8
    
    import os
    import random
    import sys
    import cv2
    import numpy as np
    npr = np.random
    import argparse
    from tqdm import tqdm
    from utils import IOU
    from BBox_utils import getDataFromTxt, BBox
    data_dir = '../data'
    
    
    def main(args):
        """
        用于处理带有landmark的数据
        """
        size = args.input_size
        # 是否对图像变换
        argument = True
        if size == 12:
            net = 'PNet'
        elif size == 24:
            net = 'RNet'
        elif size == 48:
            net = 'ONet'
        image_id = 0
        # 数据输出路径
        OUTPUT = os.path.join(data_dir, str(size))
        if not os.path.exists(OUTPUT):
            os.mkdir(OUTPUT)
        # 图片处理后输出路径
        dstdir = os.path.join(OUTPUT, 'train_%s_landmark_aug' %(net))
        if not os.path.exists(dstdir):
            os.mkdir(dstdir)
        # label记录txt
        ftxt = os.path.join(data_dir, 'trainImageList.txt')  # trainImageList.txt记录了CelebA数据的路径以及关键点信息.
        # 记录label的txt
        f = open(os.path.join(OUTPUT, 'landmark_%d_aug.txt' %(size)), 'w')
        # 获取图像路径,box,关键点
        data = getDataFromTxt(ftxt, data_dir)
        idx = 0
        for (imgPath, box, landmarkGt) in tqdm(data):
            # 存储人脸图片和关键点
            F_imgs = []
            F_landmarks = []
            img = cv2.imread(imgPath)
            
            img_h, img_w, img_c = img.shape
            gt_box = np.array([box.left, box.top, box.right, box.bottom])
            # 人脸图片
            f_face = img[box.top:box.bottom+1, box.left:box.right+1]
            # resize成网络输入大小
            f_face = cv2.resize(f_face, (size, size))
            
            landmark = np.zeros((5, 2))
            for index, one in enumerate(landmarkGt):
                # 关键点相对于左上坐标偏移量并归一化
                rv = ((one[0]-gt_box[0])/(gt_box[2]-gt_box[0]), (one[1]-gt_box[1])/(gt_box[3]-gt_box[1]))
                landmark[index] = rv
            F_imgs.append(f_face)
            F_landmarks.append(landmark.reshape(10))
            landmark = np.zeros((5, 2))
            if argument:
                # 对图像变换
                idx = idx+1
                x1, y1, x2, y2 = gt_box
                gt_w = x2 - x1 + 1
                gt_h = y2 - y1 + 1
                # 除去过小的人脸图像
                if max(gt_w, gt_h) < 40 or x1 < 0 or y1 < 0:
                    continue
                for i in range(10):
                    # 随机裁剪图像大小
                    # 每张图片截取10个, x下面计算方法类似于在positive和part的截图过程.
                    box_size = npr.randint(int(min(gt_w, gt_h)*0.8), np.ceil(1.25*max(gt_w, gt_h)))
                    # 随机左上坐标偏移量
                    delta_x = npr.randint(-gt_w*0.2, gt_w*0.2)
                    delta_y = npr.randint(-gt_h*0.2, gt_h*0.2)
                    # 计算左上坐标
                    nx1 = int(max(x1+gt_w/2-box_size/2+delta_x, 0))
                    ny1 = int(max(y1+gt_h/2-box_size/2+delta_y, 0))
                    nx2 = nx1 + box_size
                    ny2 = ny1 + box_size
                    # 除去超过边界的
                    if nx2 > img_w or ny2 > img_h:
                        continue
                    # 裁剪边框, 图片
                    crop_box = np.array([nx1, ny1, nx2, ny2])
                    cropped_im = img[ny1:ny2+1, nx1:nx2+1, :]
                    resized_im = cv2.resize(cropped_im, (size, size))
                    iou = IOU(crop_box, np.expand_dims(gt_box, 0))  # 扩展数组形状. -> 1 * 1 * 4
                    # 只保留pos图像
                    if iou > 0.65:
                        F_imgs.append(resized_im)
                        # 关键点相对偏移
                        for index, one in enumerate(landmarkGt):
                            rv = ((one[0]-nx1)/box_size, (one[1]-ny1)/box_size)
                            landmark[index] = rv
                        F_landmarks.append(landmark.reshape(10))
                        landmark = np.zeros((5, 2))
                        landmark_ = F_landmarks[-1].reshape(-1, 2)
                        box = BBox([nx1, ny1, nx2, ny2])
                        # 镜像
                        if random.choice([0, 1]) > 0:
                            face_flipped, landmark_flipped = flip(resized_im, landmark_)
                            face_flipped = cv2.resize(face_flipped, (size, size))
                            F_imgs.append(face_flipped)
                            F_landmarks.append(landmark_flipped.reshape(10))
                        # 逆时针翻转
                        if random.choice([0, 1]) > 0:
                            face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), 5)
                            # 关键点偏移
                            landmark_rorated = box.projectLandmark(landmark_rorated)
                            face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                            F_imgs.append(face_rotated_by_alpha)
                            F_landmarks.append(landmark_rorated.reshape(10))
                            
                            # 左右翻转
                            face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                            face_flipped = cv2.resize(face_flipped, (size, size))
                            F_imgs.append(face_flipped)
                            F_landmarks.append(landmark_flipped.reshape(10))
                        # 顺时针翻转
                        if random.choice([0, 1]) > 0:
                            face_rotated_by_alpha, landmark_rorated = rotate(img, box, box.reprojectLandmark(landmark_), -5)
                            # 关键点偏移
                            landmark_rorated = box.projectLandmark(landmark_rorated)
                            face_rotated_by_alpha = cv2.resize(face_rotated_by_alpha, (size, size))
                            F_imgs.append(face_rotated_by_alpha)
                            F_landmarks.append(landmark_rorated.reshape(10))
                            
                            # 左右翻转
                            face_flipped, landmark_flipped = flip(face_rotated_by_alpha, landmark_rorated)
                            face_flipped = cv2.resize(face_flipped, (size, size))
                            F_imgs.append(face_flipped)
                            F_landmarks.append(landmark_flipped.reshape(10))
    
            F_imgs, F_landmarks = np.asarray(F_imgs), np.asarray(F_landmarks)
            for i in range(len(F_imgs)):
                # 剔除数据偏移量在[0,1]之间
                if np.sum(np.where(F_landmarks[i] <= 0, 1, 0)) > 0:
                    continue
                if np.sum(np.where(F_landmarks[i] >= 1, 1, 0)) > 0:
                    continue
                cv2.imwrite(os.path.join(dstdir, '%d.jpg' %(image_id)), F_imgs[i])
                landmarks = list(map(str, list(F_landmarks[i])))
                f.write(os.path.join(dstdir, '%d.jpg' %(image_id))+' -2 '+' '.join(landmarks)+'
    ')
                image_id += 1
        f.close()
        return F_imgs, F_landmarks
    
    
    def flip(face, landmark):
        # 镜像
        face_flipped_by_x = cv2.flip(face, 1)
        landmark_ = np.asarray([(1-x, y) for (x, y) in landmark])
        landmark_[[0, 1]] = landmark_[[1, 0]]
        landmark_[[3, 4]] = landmark_[[4, 3]]
        return (face_flipped_by_x, landmark_)
    
    
    def rotate(img, box, landmark, alpha):
        # 旋转
        center = ((box.left+box.right)/2, (box.top+box.bottom)/2)
        rot_mat = cv2.getRotationMatrix2D(center, alpha, 1)
        img_rotated_by_alpha = cv2.warpAffine(img, rot_mat, (img.shape[1], img.shape[0]))
        landmark_ = np.asarray([(rot_mat[0][0]*x+rot_mat[0][1]*y+rot_mat[0][2],
                                rot_mat[1][0]*x+rot_mat[1][1]*y+rot_mat[1][2]) for (x, y) in landmark])
        face = img_rotated_by_alpha[box.top:box.bottom+1, box.left:box.right+1]
        return (face, landmark_)
    
    
    def parse_arguments(argv):
    
        parser = argparse.ArgumentParser()
        parser.add_argument('input_size', type=int,
                            help='The input size for specific net')
        
        return parser.parse_args(argv)
    
    
    if __name__ == '__main__':
        main(parse_arguments(sys.argv[1:]))
    View Code

    合并生成P-Net训练使用的数据:

    gen_imglist_pnet.py

    # coding: utf-8
    
    import numpy as np
    npr = np.random
    import os
    data_dir = '../data/'
    
    
    """
    将pos, part, neg, landmark四者混在一起
    """
    
    size = 12
    with open(os.path.join(data_dir, '12/pos_12.txt'), 'r') as f:
        pos = f.readlines()
    with open(os.path.join(data_dir, '12/neg_12.txt'), 'r') as f:
        neg = f.readlines()
    with open(os.path.join(data_dir, '12/part_12.txt'), 'r') as f:
        part = f.readlines()
    with open(os.path.join(data_dir, '12/landmark_12_aug.txt'), 'r') as f:
        landmark = f.readlines()
    dir_path = os.path.join(data_dir, '12')
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    with open(os.path.join(dir_path, 'train_pnet_landmark.txt'), 'w') as f:
        nums = [len(neg), len(pos), len(part)]
        base_num = 250000
        print('neg数量:{} pos数量:{} part数量:{} 基数:{}'.format(len(neg), len(pos), len(part), base_num))
        if len(neg) > base_num*3:
            neg_keep = npr.choice(len(neg), size=base_num*3, replace=True)
        else:
            neg_keep = npr.choice(len(neg), size=len(neg), replace=True)
        sum_p = len(neg_keep)//3  # pos : part : neg = 1 : 1 : 3
        pos_keep = npr.choice(len(pos), sum_p, replace=True)
        part_keep = npr.choice(len(part), sum_p, replace=True)
        print('neg数量:{} pos数量:{} part数量:{}'.format(len(neg_keep), len(pos_keep), len(part_keep)))
        for i in pos_keep:
            f.write(pos[i])
        for i in neg_keep:
            f.write(neg[i])
        for i in part_keep:
            f.write(part[i])
        for item in landmark:
            f.write(item) 
    View Code

    将训练数据转换成TFRecords个数文件:

    gen_tfrecords.py

    # coding: utf-8
    
    import os
    import random
    import sys
    import time
    import tensorflow as tf
    import cv2
    from tqdm import tqdm
    import argparse
    
    
    def main(args):
        """
        生成tfrecords文件
        """
        size = args.input_size
        # 数据存放地址
        dataset_dir = '../data/'
        # tfrecord存放地址
        output_dir = os.path.join(dataset_dir, str(size)+'/tfrecord')
        if not os.path.exists(output_dir):
            os.mkdir(output_dir)
        # pnet只生成一个混合的tfrecords, rnet和onet要分别生成4个
        if size == 12:
            net = 'PNet'
            tf_filenames = [os.path.join(output_dir, 'train_%s_landmark.tfrecord' % net)]
            items = ['12/train_pnet_landmark.txt']
        elif size == 24:
            net = 'RNet'
            tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
            item1 = '%d/pos_%d.txt' % (size, size)
            tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
            item2 = '%d/part_%d.txt' % (size, size)
            tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
            item3 = '%d/neg_%d.txt' % (size, size)
            tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
            item4 = '%d/landmark_%d_aug.txt' % (size, size)
            tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
            items = [item1, item2, item3, item4]
        elif size == 48:
            net = 'ONet'
            tf_filename1 = os.path.join(output_dir, 'pos_landmark.tfrecord')
            item1 = '%d/pos_%d.txt' % (size, size)
            tf_filename2 = os.path.join(output_dir, 'part_landmark.tfrecord')
            item2 = '%d/part_%d.txt' % (size, size)
            tf_filename3 = os.path.join(output_dir, 'neg_landmark.tfrecord')
            item3 = '%d/neg_%d.txt' % (size, size)
            tf_filename4 = os.path.join(output_dir, 'landmark_landmark.tfrecord')
            item4 = '%d/landmark_%d_aug.txt' % (size, size)
            tf_filenames = [tf_filename1, tf_filename2, tf_filename3, tf_filename4]
            items = [item1, item2, item3, item4]
        
        if tf.gfile.Exists(tf_filenames[0]):
            print('tfrecords文件早已生成,无需此操作')
            return
        # 获取数据
        for tf_filename, item in zip(tf_filenames, items):
            print('开始读取数据')
            dataset = get_dataset(dataset_dir, item)
            tf_filename = tf_filename+'_shuffle'
            random.shuffle(dataset)  # 数据进行打乱
            print('开始转换tfrecords')
            with tf.python_io.TFRecordWriter(tf_filename) as tfrecord_writer:
                for image_example in tqdm(dataset):
                    filename = image_example['filename']
                    try:
                        _add_to_tfrecord(filename, image_example, tfrecord_writer)
                    except:
                        print(filename)
        print('完成转换')
    
    
    def get_dataset(dir, item):
        """
        从txt获取数据
        参数:
          dir:存放数据目录
          item:txt目录
        返回值:
          包含label,box,关键点的data
        """
        dataset_dir = os.path.join(dir, item)
        imagelist = open(dataset_dir, 'r')
        dataset = []
        for line in tqdm(imagelist.readlines()):  # 进度条显示
            info = line.strip().split(' ')
            data_example = dict()
            bbox = dict()
            data_example['filename'] = info[0]
            data_example['label'] = int(info[1])
            # neg的box默认为0,part,pos的box只包含人脸框,landmark的box只包含关键点
            bbox['xmin'] = 0
            bbox['ymin'] = 0
            bbox['xmax'] = 0
            bbox['ymax'] = 0
            bbox['xlefteye'] = 0
            bbox['ylefteye'] = 0
            bbox['xrighteye'] = 0
            bbox['yrighteye'] = 0
            bbox['xnose'] = 0
            bbox['ynose'] = 0
            bbox['xleftmouth'] = 0
            bbox['yleftmouth'] = 0
            bbox['xrightmouth'] = 0
            bbox['yrightmouth'] = 0        
            if len(info) == 6:  # 长度为6, 说明只有人脸框标记(6-2)
                bbox['xmin'] = float(info[2])
                bbox['ymin'] = float(info[3])
                bbox['xmax'] = float(info[4])
                bbox['ymax'] = float(info[5])
            if len(info) == 12:  # 长度为12, 说明是人脸关键点关键点(12-2)
                bbox['xlefteye'] = float(info[2])
                bbox['ylefteye'] = float(info[3])
                bbox['xrighteye'] = float(info[4])
                bbox['yrighteye'] = float(info[5])
                bbox['xnose'] = float(info[6])
                bbox['ynose'] = float(info[7])
                bbox['xleftmouth'] = float(info[8])
                bbox['yleftmouth'] = float(info[9])
                bbox['xrightmouth'] = float(info[10])
                bbox['yrightmouth'] = float(info[11])
            data_example['bbox'] = bbox
            dataset.append(data_example)
        return dataset
    
    
    def _add_to_tfrecord(filename, image_example, tfrecord_writer):
        """
        转换成tfrecord文件
        参数:
          filename:图片文件名
          image_example:数据
          tfrecord_writer:写入文件
        """
        image_data, height, width = _process_image_withoutcoder(filename)
        example = _convert_to_example_simple(image_example, image_data)
        tfrecord_writer.write(example.SerializeToString())
    
    
    def _process_image_withoutcoder(filename):
        """
        读取图片文件,返回图片大小
        """
        image = cv2.imread(filename)
        image_data = image.tostring()
        assert len(image.shape) == 3
        height = image.shape[0]
        width = image.shape[1]
        assert image.shape[2] == 3
        return image_data, height, width
    
    
    # 不同类型数据的转换
    def _int64_feature(value):
        if not isinstance(value, list):
            value = [value]
        return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
    
    
    def _float_feature(value):
        if not isinstance(value, list):
            value = [value]
        return tf.train.Feature(float_list=tf.train.FloatList(value=value))
    
    
    def _bytes_feature(value):
        if not isinstance(value, list):
            value = [value]
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))
    
    
    def _convert_to_example_simple(image_example, image_buffer):
        """
        转换成tfrecord接受形式
        """
        class_label = image_example['label']
        bbox = image_example['bbox']
        roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']]
        landmark = [bbox['xlefteye'], bbox['ylefteye'], bbox['xrighteye'], bbox['yrighteye'], bbox['xnose'], bbox['ynose'],
                    bbox['xleftmouth'], bbox['yleftmouth'], bbox['xrightmouth'], bbox['yrightmouth']]
    
        example = tf.train.Example(features=tf.train.Features(feature={
            'image/encoded': _bytes_feature(image_buffer),
            'image/label': _int64_feature(class_label),
            'image/roi': _float_feature(roi),
            'image/landmark': _float_feature(landmark)
        }))
        return example
    
    
    def parse_arguments(argv):
    
        parser = argparse.ArgumentParser()
    
        parser.add_argument('input_size', type=int,
                            help='The input size for specific net')
        
        return parser.parse_args(argv)
    
    
    if __name__ == '__main__':
        main(parse_arguments(sys.argv[1:]))
    View Code

    训练:

    train_model.py

    # coding: utf-8
    
    
    import os
    import sys
    from datetime import datetime
    import numpy as np
    import tensorflow as tf
    import config as FLAGS
    import random
    import cv2
    
    
    def train(net_factory, prefix, end_epoch, base_dir, display, base_lr):
        """
        训练模型
        """
        size = int(base_dir.split('/')[-1])  # 获取得到网络大小(因为base_dir保存的路径为:../data/12, ../data/24, ../data/48)
    
        # 论文中的alpha, 代表了任务的重要性. 和论文中保持一致.
        if size == 12:
            net = 'PNet'
            radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
        elif size == 24:
            net = 'RNet'
            radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 0.5;
        elif size == 48:
            net = 'ONet'
            radio_cls_loss = 1.0; radio_bbox_loss = 0.5; radio_landmark_loss = 1;
            
        if net == 'PNet':
            # 计算一共多少组数据
            label_file = os.path.join(base_dir, 'train_pnet_landmark.txt')
            f = open(label_file, 'r')
       
            num = len(f.readlines())
            dataset_dir = os.path.join(base_dir, 'tfrecord/train_PNet_landmark.tfrecord_shuffle')
            # 从tfrecord读取数据
            image_batch, label_batch, bbox_batch, landmark_batch = read_single_tfrecord(dataset_dir, FLAGS.batch_size, net)
        else:
            # 计算一共多少组数据
            label_file1 = os.path.join(base_dir, 'pos_%d.txt' % size)
            f1 = open(label_file1, 'r')
            label_file2 = os.path.join(base_dir, 'part_%d.txt' % size)
            f2 = open(label_file2, 'r')
            label_file3 = os.path.join(base_dir, 'neg_%d.txt' % size)
            f3 = open(label_file3, 'r')
            label_file4 = os.path.join(base_dir, 'landmark_%d_aug.txt' % size)
            f4 = open(label_file4, 'r')
       
            num = len(f1.readlines())+len(f2.readlines())+len(f3.readlines())+len(f4.readlines())
        
            pos_dir = os.path.join(base_dir, 'tfrecord/pos_landmark.tfrecord_shuffle')
            part_dir = os.path.join(base_dir, 'tfrecord/part_landmark.tfrecord_shuffle')
            neg_dir = os.path.join(base_dir, 'tfrecord/neg_landmark.tfrecord_shuffle')
            landmark_dir = os.path.join(base_dir, 'tfrecord/landmark_landmark.tfrecord_shuffle')
            dataset_dirs = [pos_dir, part_dir, neg_dir, landmark_dir]
            # 各数据占比
            # 目的是使每一个batch的数据占比都相同
            # 训练数据的比例, pos : part : landmark, neg = 1 : 1 : 1 : 3.
            pos_radio, part_radio, landmark_radio, neg_radio = 1.0/6, 1.0/6, 1.0/6, 3.0/6
            pos_batch_size = int(np.ceil(FLAGS.batch_size*pos_radio))
            assert pos_batch_size != 0, "Batch Size 有误 "
            part_batch_size = int(np.ceil(FLAGS.batch_size*part_radio))
            assert part_batch_size != 0, "BBatch Size 有误 "
            neg_batch_size = int(np.ceil(FLAGS.batch_size*neg_radio))
            assert neg_batch_size != 0, "Batch Size 有误 "
            landmark_batch_size = int(np.ceil(FLAGS.batch_size*landmark_radio))
            assert landmark_batch_size != 0, "Batch Size 有误 "
            batch_sizes = [pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size]
            image_batch, label_batch, bbox_batch, landmark_batch = read_multi_tfrecords(dataset_dirs, batch_sizes, net)
    
        # 定义占位符, 训练时使用, 后续将读取的tfrecords数据传入.
        input_image = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, size, size, 3], name='input_image')
        label = tf.placeholder(tf.float32, shape=[FLAGS.batch_size], name='label')
        bbox_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 4], name='bbox_target')
        landmark_target = tf.placeholder(tf.float32, shape=[FLAGS.batch_size, 10], name='landmark_target')
        # 图像色相变换
        input_image = image_color_distort(input_image)
        cls_loss_op, bbox_loss_op, landmark_loss_op, L2_loss_op, accuracy_op = net_factory(input_image,
                                                                                           label,
                                                                                           bbox_target,
                                                                                           landmark_target,
                                                                                           training=True)
    
        # 计算训练损失, 论文中公式实现.
        total_loss_op = radio_cls_loss*cls_loss_op+radio_bbox_loss*bbox_loss_op+radio_landmark_loss*landmark_loss_op+L2_loss_op
        train_op, lr_op = optimize(base_lr, total_loss_op, num)
    
        # 将变量添加到tensorboard, 实现可视化.
        tf.summary.scalar("cls_loss", cls_loss_op)  # cls_loss
        tf.summary.scalar("bbox_loss", bbox_loss_op)  # bbox_loss
        tf.summary.scalar("landmark_loss", landmark_loss_op)  # landmark_loss
        tf.summary.scalar("cls_accuracy", accuracy_op)  # cls_acc
        tf.summary.scalar("total_loss", total_loss_op)  # cls_loss, bbox loss, landmark loss and L2 loss add together
        summary_op = tf.summary.merge_all()
        logs_dir = "../graph/%s" % net
        if not os.path.exists(logs_dir):  # if os.path.exists(logs_dir) == False:
            os.mkdir(logs_dir)
        # 模型训练
        init = tf.global_variables_initializer()
        sess = tf.Session()
    
        saver = tf.train.Saver(max_to_keep=3)
        sess.run(init)
        # 模型的graph
        writer = tf.summary.FileWriter(logs_dir, sess.graph)
        # 使用 tf.train.Coordinator()来创建一个线程管理器(协调器)对象, 管理线程.
        coord = tf.train.Coordinator()
        # 启动QueueRunner
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        i = 0
        
        MAX_STEP = int(num / FLAGS.batch_size + 1) * end_epoch
        epoch = 0
        sess.graph.finalize()
        try:
            for step in range(MAX_STEP):
                i = i + 1
                if coord.should_stop():
                    break
                image_batch_array, label_batch_array, bbox_batch_array, landmark_batch_array = sess.run([image_batch,
                                                                                                         label_batch,
                                                                                                         bbox_batch,
                                                                                                         landmark_batch])
                # 随机翻转图像
                image_batch_array, landmark_batch_array = random_flip_images(image_batch_array,
                                                                             label_batch_array,
                                                                             landmark_batch_array)
    
                _, _, summary = sess.run([train_op, lr_op, summary_op],
                                         feed_dict={input_image: image_batch_array,
                                                    label: label_batch_array,
                                                    bbox_target: bbox_batch_array,
                                                    landmark_target: landmark_batch_array})
                # 训练过程
                if (step+1) % display == 0:
                    cls_loss, bbox_loss, landmark_loss, L2_loss, lr, acc = sess.run([cls_loss_op,
                                                                                     bbox_loss_op,
                                                                                     landmark_loss_op,
                                                                                     L2_loss_op,
                                                                                     lr_op,
                                                                                     accuracy_op],
                                                                                    feed_dict={input_image: image_batch_array,
                                                                                               label: label_batch_array,
                                                                                               bbox_target: bbox_batch_array,
                                                                                               landmark_target: landmark_batch_array})
    
                    total_loss = radio_cls_loss*cls_loss + radio_bbox_loss*bbox_loss + radio_landmark_loss*landmark_loss + L2_loss
                    print('epoch: %d/%d' % (epoch+1, end_epoch))
                    print("Step: %d/%d, accuracy: %3f, cls loss: %4f, bbox loss: %4f, Landmark loss :%4f, L2 loss: %4f, Total Loss: %4f, lr:%f"
                          % (step+1, MAX_STEP, acc, cls_loss, bbox_loss, landmark_loss, L2_loss, total_loss, lr))
    
                # 每一次epoch保留一次模型
                if i * FLAGS.batch_size > num:
                    epoch = epoch + 1
                    i = 0
                    path_prefix = saver.save(sess, prefix, global_step=epoch)
                writer.add_summary(summary, global_step=step)
        except tf.errors.OutOfRangeError:
            print("完成!!!")
        finally:
            coord.request_stop()
            writer.close()
        coord.join(threads)
        sess.close()
    
    
    def optimize(base_lr, loss, data_num):
        """
        参数优化
        """
        lr_factor = 0.1
        global_step = tf.Variable(0, trainable=False)
        # 计算训练次数 data_num / batch 为整个训练集完成一次训练需要的次数. 再乘以epoch(整个数据集训练次数), 即为总的训练次数.
        # 这里使用阶梯式的学习率lr, 所以lr也区分三个. base_lr * lr_factor ^ x  --->  x=(0, 1, 2, 3)
        boundaries = [int(epoch * data_num / FLAGS.batch_size) for epoch in FLAGS.LR_EPOCH]
        lr_values = [base_lr * (lr_factor ** x) for x in range(0, len(FLAGS.LR_EPOCH) + 1)]
        lr_op = tf.train.piecewise_constant(global_step, boundaries, lr_values)
        # 使用momentum优化器
        optimizer = tf.train.MomentumOptimizer(lr_op, 0.9)
        train_op = optimizer.minimize(loss, global_step)
        return train_op, lr_op
    
    
    def read_single_tfrecord(tfrecord_file, batch_size, net):
        """
        读取tfrecord数据
        """
        filename_queue = tf.train.string_input_producer([tfrecord_file], shuffle=True)
        reader = tf.TFRecordReader()
        _, serialized_example = reader.read(filename_queue)
        image_features = tf.parse_single_example(serialized_example,
                                                 features={
                                                           'image/encoded': tf.FixedLenFeature([], tf.string),
                                                           'image/label': tf.FixedLenFeature([], tf.int64),
                                                           'image/roi': tf.FixedLenFeature([4], tf.float32),
                                                           'image/landmark': tf.FixedLenFeature([10], tf.float32)})
    
        if net == 'PNet':
            image_size = 12
        elif net == 'RNet':
            image_size = 24
        elif net == 'ONet':
            image_size = 48
    
        # _bytes_feature将原始图像进行转换保存到tfrecords文件, tf.decode_raw将原来编码为字符串类型的变量重新变回来原始图像数据
        image = tf.decode_raw(image_features['image/encoded'], tf.uint8)
        image = tf.reshape(image, [image_size, image_size, 3])
        # 将值规划在[-1,1]内
        image = (tf.cast(image, tf.float32)-127.5)/128  # 上面将数据转换成uint8, 即8位无符号整型(0-255).
        
        label = tf.cast(image_features['image/label'], tf.float32)
        roi = tf.cast(image_features['image/roi'], tf.float32)
        landmark = tf.cast(image_features['image/landmark'], tf.float32)
        image, label, roi, landmark = tf.train.batch([image, label, roi, landmark],
                                                     batch_size=batch_size,
                                                     num_threads=2,
                                                     capacity=batch_size)
    
        # tf.train.batch获取一个batch的数据, 所以下面将数据的第一维reshape成batch_size.
        label = tf.reshape(label, [batch_size])
        roi = tf.reshape(roi, [batch_size, 4])
        landmark = tf.reshape(landmark, [batch_size, 10])
        return image, label, roi, landmark
    
    
    def read_multi_tfrecords(tfrecord_files, batch_sizes, net):
        """
        读取多个tfrecord文件放一起
        """
        pos_dir, part_dir, neg_dir, landmark_dir = tfrecord_files
        pos_batch_size, part_batch_size, neg_batch_size, landmark_batch_size = batch_sizes
       
        pos_image, pos_label, pos_roi, pos_landmark = read_single_tfrecord(pos_dir, pos_batch_size, net)
      
        part_image, part_label, part_roi, part_landmark = read_single_tfrecord(part_dir, part_batch_size, net)
      
        neg_image, neg_label, neg_roi, neg_landmark = read_single_tfrecord(neg_dir, neg_batch_size, net)
    
        landmark_image, landmark_label, landmark_roi, landmark_landmark = read_single_tfrecord(landmark_dir, landmark_batch_size, net)
    
        images = tf.concat([pos_image, part_image, neg_image, landmark_image], 0, name="concat/image")
       
        labels = tf.concat([pos_label, part_label, neg_label, landmark_label], 0, name="concat/label")
     
        assert isinstance(labels, object)
    
        rois = tf.concat([pos_roi, part_roi, neg_roi, landmark_roi], 0, name="concat/roi")
        
        landmarks = tf.concat([pos_landmark, part_landmark, neg_landmark, landmark_landmark], 0, name="concat/landmark")
        return images, labels, rois, landmarks
    
    
    def image_color_distort(inputs):
        inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5)
        inputs = tf.image.random_brightness(inputs, max_delta=0.2)
        inputs = tf.image.random_hue(inputs,max_delta= 0.2)
        inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5)
    
        return inputs
    
    
    def random_flip_images(image_batch,label_batch,landmark_batch):
        '''随机翻转图像'''
        if random.choice([0,1]) > 0:
            num_images = image_batch.shape[0]
            fliplandmarkindexes = np.where(label_batch==-2)[0]
            flipposindexes = np.where(label_batch==1)[0]
            
            flipindexes = np.concatenate((fliplandmarkindexes,flipposindexes))
              
            for i in flipindexes:
                cv2.flip(image_batch[i],1,image_batch[i])        
            
               
            for i in fliplandmarkindexes:
                landmark_ = landmark_batch[i].reshape((-1,2))
                landmark_ = np.asarray([(1-x, y) for (x, y) in landmark_])
                landmark_[[0, 1]] = landmark_[[1, 0]]
                landmark_[[3, 4]] = landmark_[[4, 3]]       
                landmark_batch[i] = landmark_.ravel()
            
        return image_batch,landmark_batch
    View Code

    train.py

    # coding: utf-8
    
    from model import P_Net, R_Net, O_Net
    import argparse
    import os
    import sys
    import config as FLAGS
    from train_model import train
    net_factorys = [P_Net, R_Net, O_Net]
    
    
    def main(args):
        size = args.input_size
        base_dir = os.path.join('../data/', str(size))
        
        if size == 12:
            net = 'PNet'
            net_factory = net_factorys[0]
            end_epoch = FLAGS.end_epoch[0]
        elif size == 24:
            net = 'RNet'
            net_factory = net_factorys[1]
            end_epoch = FLAGS.end_epoch[1]
        elif size == 48:
            net = 'ONet'
            net_factory = net_factorys[2]
            end_epoch = FLAGS.end_epoch[2]
        model_path = os.path.join('../model/', net)
        if not os.path.exists(model_path):
            os.mkdir(model_path)
        prefix = os.path.join(model_path, net)
        display = FLAGS.display
        lr = FLAGS.lr
        train(net_factory, prefix, end_epoch, base_dir, display, lr)
    
    
    def parse_arguments(argv):
    
        parser = argparse.ArgumentParser()
    
        parser.add_argument('input_size', type=int,
                            help='The input size for specific net')
        
        return parser.parse_args(argv)
    
    
    if __name__ == '__main__':
        main(parse_arguments(sys.argv[1:]))
    View Code

    其中,模型文件:

    model.py

    # coding: utf-8
    
    # In[1]:
    
    
    import tensorflow as tf
    slim = tf.contrib.slim
    import numpy as np
    # 只把70%数据用作参数更新
    num_keep_radio = 0.7
    
    
    def P_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
        """
        PNet的结构
        """
        with tf.variable_scope('PNet'):
            # 使用tensorflow slim构建神经网络
            with slim.arg_scope([slim.conv2d], activation_fn=prelu,
                                weights_initializer=slim.xavier_initializer(),
                                weights_regularizer=slim.l2_regularizer(0.0005),
                                padding='VALID'):
                net = slim.conv2d(inputs, 10, 3, scope='conv1')  # 第一层:输出为10, kernel_size为3
                net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool1')
                net = slim.conv2d(net, 16, 3, scope='conv2')
                net = slim.conv2d(net, 32, 3, scope='conv3')
                # 二分类输出通道数为2
                conv4_1 = slim.conv2d(net, 2, 1, activation_fn=tf.nn.softmax, scope='conv4_1')  # 二分类预测是不是人脸框
                bbox_pred = slim.conv2d(net, 4, 1, activation_fn=None, scope='conv4_2')  # 4回归获取人脸框坐标
                landmark_pred = slim.conv2d(net, 10, 1, activation_fn=None, scope='conv4_3')  # 10回归获取人脸特征点坐标
                
                if training:
                    # 删除维度1, 2, size为1的维度, 即:[batch 1 1 2] -> [batch, 2]
                    cls_prob = tf.squeeze(conv4_1, [1, 2], name='cls_prob')
                    cls_loss = cls_ohem(cls_prob, label)
                    
                    bbox_pred = tf.squeeze(bbox_pred, [1, 2], name='bbox_pred')  # [batch, 4]
                    bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                    
                    landmark_pred = tf.squeeze(landmark_pred, [1, 2], name='landmark_pred')  # [batch, 10]
                    landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                    
                    accuracy = cal_accuracy(cls_prob, label)
                    L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                    return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
                else:
                    # 测试时batch_size=1
                    cls_pro_test = tf.squeeze(conv4_1, axis=0)
                    bbox_pred_test = tf.squeeze(bbox_pred, axis=0)
                    landmark_pred_test = tf.squeeze(landmark_pred, axis=0)
                    return cls_pro_test, bbox_pred_test, landmark_pred_test
    
    
    def R_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
        """
        RNet的结构
        """
        with tf.variable_scope('RNet'):
            with slim.arg_scope([slim.conv2d],
                                activation_fn=prelu,
                                weights_initializer=slim.xavier_initializer(),
                                weights_regularizer=slim.l2_regularizer(0.0005),
                                padding='VALID'):
                net = slim.conv2d(inputs, 28, 3, scope='conv1')
                net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
                net = slim.conv2d(net, 48, 3, scope='conv2')
                net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
                net = slim.conv2d(net, 64, 2, scope='conv3')
                fc_flatten = slim.flatten(net)
                fc1 = slim.fully_connected(fc_flatten, num_outputs=128, scope='fc1')
                
                cls_prob = slim.fully_connected(fc1, num_outputs=2,activation_fn=tf.nn.softmax, scope='cls_fc')
                bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
                landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
                if training:
                    cls_loss = cls_ohem(cls_prob, label)
                    bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                    landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                    
                    accuracy = cal_accuracy(cls_prob, label)
                    L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                    return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
                else:
                    return cls_prob, bbox_pred, landmark_pred
    
    
    def O_Net(inputs, label=None, bbox_target=None, landmark_target=None, training=True):
        """
        ONet结构
        """
        with tf.variable_scope('ONet'):
            with slim.arg_scope([slim.conv2d],
                                activation_fn=prelu,
                                weights_initializer=slim.xavier_initializer(),
                                weights_regularizer=slim.l2_regularizer(0.0005),
                                padding='VALID'):
                net = slim.conv2d(inputs, 32, 3, scope='conv1')
                net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, padding='SAME', scope='pool1')
                net = slim.conv2d(net, 64, 3, scope='conv2')
                net = slim.max_pool2d(net, kernel_size=[3, 3], stride=2, scope='pool2')
                net = slim.conv2d(net, 64, 3, scope='conv3')
                net = slim.max_pool2d(net, kernel_size=[2, 2], stride=2, padding='SAME', scope='pool3')
                net = slim.conv2d(net, 128, 2, scope='conv4')
                fc_flatten = slim.flatten(net)
                fc1 = slim.fully_connected(fc_flatten, num_outputs=256, scope='fc1')
                
                cls_prob = slim.fully_connected(fc1, num_outputs=2, activation_fn=tf.nn.softmax, scope='cls_fc')
                bbox_pred = slim.fully_connected(fc1, num_outputs=4, activation_fn=None, scope='bbox_fc')
                landmark_pred = slim.fully_connected(fc1, num_outputs=10, activation_fn=None, scope='landmark_fc')
                if training:
                    cls_loss = cls_ohem(cls_prob, label)
                    bbox_loss = bbox_ohem(bbox_pred, bbox_target, label)
                    landmark_loss = landmark_ohem(landmark_pred, landmark_target, label)
                    
                    accuracy = cal_accuracy(cls_prob, label)
                    L2_loss = tf.add_n(slim.losses.get_regularization_losses())
                    return cls_loss, bbox_loss, landmark_loss, L2_loss, accuracy
                else:
                    return cls_prob, bbox_pred, landmark_pred
    
    
    def prelu(inputs):
        """
        prelu函数定义
        """
        alphas = tf.get_variable('alphas', shape=inputs.get_shape()[-1], dtype=tf.float32,
                                 initializer=tf.constant_initializer(0.25))
        pos = tf.nn.relu(inputs)
        neg = alphas*(inputs-abs(inputs))*0.5
        return pos+neg
    
    
    def cls_ohem(cls_prob, label):
        """
        计算类别损失
        参数:
          cls_prob:预测类别,是否有人
          label:真实值
        返回值:
          损失
        """
        zeros = tf.zeros_like(label)
    
        # neg: 0, pos: 1, part: -1
        # negatives and positives are used for face classification tasks
        # 这里只把pos的label置1, neg和part的label置0.
        # neg: label->0, pos: label->1, part: 0
        label_filter_invalid = tf.where(tf.less(label, 0), zeros, label)
        num_cls_prob = tf.size(cls_prob)  # 计算类别的size=batch*2
        cls_prob_reshape = tf.reshape(cls_prob, [num_cls_prob, -1])  # 将类别数组转换成1维的
        label_int = tf.cast(label_filter_invalid, tf.int32)  # 将置0, 1的数组转换成int32的
        num_row = tf.to_int32(cls_prob.get_shape()[0])  # 获取batch数
        # 对应某一batch而言,batch*2为非人类别概率,batch*2+1为人概率类别,indices为对应 cls_prob_reshape
        # 应该的真实值,后续用交叉熵计算损失
        row = tf.range(num_row) * 2  # 生成每一个类别的基址:(0, 2, 4, 6, ..., (num_row - 1) * 2)
        # 以上面为基址, 即每个样本的neg类别, label_int为是neg还是pos. 训练样本的label_int=0, neg; label_int=1, pos.
        indices_ = row + label_int
        # 获取真实标签对应的概率, indices_显示了实际标签的类别. 是neg还是pos.
        label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_))
        loss = -tf.log(label_prob+1e-10)  # 这里有点疑问, 交叉熵损失函数公式不是这样的吧??????
        zeros = tf.zeros_like(label_prob, dtype=tf.float32)
        ones = tf.ones_like(label_prob, dtype=tf.float32)
        # 统计neg和pos的数量loss, 这里筛选neg和pos的loss用于后续训练
        # label小于0(即part: -1)-> 0, 否则:pos、part均为1.
        # 上面全部计算了所有的
        valid_inds = tf.where(label < zeros, zeros, ones)
        num_valid = tf.reduce_sum(valid_inds)
        # 选取70%的数据
        keep_num = tf.cast(num_valid*num_keep_radio, dtype=tf.int32)
        # 只选取neg, pos的70%损失
        # loss * valid_inds 数组想乘只保留valid_inds为1的元素
        loss = loss * valid_inds
        loss, _ = tf.nn.top_k(loss, k=keep_num)
        return tf.reduce_mean(loss)
    
    
    def bbox_ohem(bbox_pred, bbox_target, label):
        """
        计算box的损失
        """
        zeros_index = tf.zeros_like(label, dtype=tf.float32)
        ones_index = tf.ones_like(label, dtype=tf.float32)
        # 保留pos和part的数据
        valid_inds = tf.where(tf.equal(tf.abs(label), 1), ones_index, zeros_index)
        # 计算平方差损失
        square_error = tf.square(bbox_pred-bbox_target)
        square_error = tf.reduce_sum(square_error, axis=1)
        # 保留的数据的个数
        num_valid = tf.reduce_sum(valid_inds)
        keep_num = tf.cast(num_valid, dtype=tf.int32)
        # 保留pos和part部分的损失
        square_error = square_error*valid_inds
        square_error, _ = tf.nn.top_k(square_error, k=keep_num)
        return tf.reduce_mean(square_error)
    
    
    def landmark_ohem(landmark_pred, landmark_target, label):
        """
        计算关键点损失
        """
        ones = tf.ones_like(label, dtype=tf.float32)
        zeros = tf.zeros_like(label, dtype=tf.float32)
        # 只保留landmark数据
        valid_inds = tf.where(tf.equal(label, -2), ones, zeros)
        # 计算平方差损失
        square_error = tf.square(landmark_pred-landmark_target)
        square_error = tf.reduce_sum(square_error, axis=1)
        # 保留数据个数
        num_valid = tf.reduce_sum(valid_inds)
        keep_num = tf.cast(num_valid, dtype=tf.int32)
        # 保留landmark部分数据损失
        square_error = square_error*valid_inds
        square_error, _ = tf.nn.top_k(square_error, k=keep_num)
        return tf.reduce_mean(square_error)
    
    
    def cal_accuracy(cls_prob, label):
        """
        计算分类准确率
        """
        # 预测最大概率的类别,0代表无人,1代表有人
        pred = tf.argmax(cls_prob, axis=1)
        label_int = tf.cast(label, tf.int64)
        # 保留label>=0的数据,即pos和neg的数据
        cond = tf.where(tf.greater_equal(label_int, 0))
        picked = tf.squeeze(cond)
        # 获取pos和neg的label值
        label_picked = tf.gather(label_int, picked)
        pred_picked = tf.gather(pred, picked)
        # 计算准确率
        accuracy_op = tf.reduce_mean(tf.cast(tf.equal(label_picked, pred_picked), tf.float32))
        return accuracy_op
    View Code

    生成下一个网络的输入:

    gen_hard_example.py

    # coding: utf-8
    
    
    import sys
    from utils import *
    import numpy as np
    import argparse
    import os
    import pickle
    import cv2
    from tqdm import tqdm
    from loader import TestLoader
    sys.path.append('../')
    from train.model import P_Net, R_Net, O_Net
    import train.config as config
    from detection.detector import Detector
    from detection.fcn_detector import FcnDetector
    from detection.MtcnnDetector import MtcnnDetector
    
    
    def main(args):
        """
        通过PNet或RNet生成下一个网络的输入
        """
        size = args.input_size
        batch_size = config.batches
        min_face_size = config.min_face
        stride = config.stride
        thresh = config.thresh
        # 模型地址
        model_path = ['../model/PNet/', '../model/RNet/', '../model/ONet']
        if size == 12:
            net = 'PNet'
            save_size = 24
        elif size == 24:
            net = 'RNet'
            save_size = 48
        # 图片数据地址
        base_dir = '../data/WIDER_train/'
        # 处理后的图片存放地址
        data_dir = '../data/%d' % save_size
        neg_dir = os.path.join(data_dir, 'negative')
        pos_dir = os.path.join(data_dir, 'positive')
        part_dir = os.path.join(data_dir, 'part')
        for dir_path in [neg_dir, pos_dir, part_dir]:
            if not os.path.exists(dir_path):
                os.makedirs(dir_path)
        detectors = [None, None, None]
        PNet = FcnDetector(P_Net, model_path[0])
        detectors[0] = PNet
        if net == 'RNet':
            RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
            detectors[1] = RNet
        basedir = '../data/'
        filename = '../data/wider_face_train_bbx_gt.txt'
        # 读取文件的image和box对应函数在utils中
        data = read_annotation(base_dir, filename)
        mtcnn_detector = MtcnnDetector(detectors, min_face_size=min_face_size,
                                       stride=stride, threshold=thresh)
        save_path = data_dir
        save_file = os.path.join(save_path, 'detections.pkl')
        if not os.path.exists(save_file):
            # 将data制作成迭代器
            print('载入数据')
            test_data = TestLoader(data['images'])
            detectors, _ = mtcnn_detector.detect_face(test_data)
            print('完成识别')
    
            with open(save_file, 'wb') as f:
                pickle.dump(detectors, f, 1)
        print('开始生成图像')
        save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path)
    
    
    def save_hard_example(save_size, data, neg_dir, pos_dir, part_dir, save_path):
        """
        将网络识别的box用来裁剪原图像作为下一个网络的输入
        """
        im_idx_list = data['images']
        gt_boxes_list = data['bboxes']
        num_of_images = len(im_idx_list)
    
        neg_label_file = "../data/%d/neg_%d.txt" % (save_size, save_size)
        neg_file = open(neg_label_file, 'w')
    
        pos_label_file = "../data/%d/pos_%d.txt" % (save_size, save_size)
        pos_file = open(pos_label_file, 'w')
    
        part_label_file = "../data/%d/part_%d.txt" % (save_size, save_size)
        part_file = open(part_label_file, 'w')
        # read detect result
        det_boxes = pickle.load(open(os.path.join(save_path, 'detections.pkl'), 'rb'))
        # print(len(det_boxes), num_of_images)
       
        assert len(det_boxes) == num_of_images, "弄错了"
    
        n_idx = 0
        p_idx = 0
        d_idx = 0
        image_done = 0
        
        for im_idx, dets, gts in tqdm(zip(im_idx_list, det_boxes, gt_boxes_list)):
            gts = np.array(gts, dtype=np.float32).reshape(-1, 4)
            image_done += 1
    
            if dets.shape[0] == 0:
                continue
            img = cv2.imread(im_idx)
            # 转换成正方形
            dets = convert_to_square(dets)
            dets[:, 0:4] = np.round(dets[:, 0:4])
            neg_num = 0
            for box in dets:
                x_left, y_top, x_right, y_bottom, _ = box.astype(int)
                width = x_right - x_left + 1
                height = y_bottom - y_top + 1
    
                # 除去过小的box框
                if width < 20 or x_left < 0 or y_top < 0 or x_right > img.shape[1] - 1 or y_bottom > img.shape[0] - 1:
                    continue
    
                Iou = IOU(box, gts)
                cropped_im = img[y_top:y_bottom + 1, x_left:x_right + 1, :]  # 截取图片得到box.
                resized_im = cv2.resize(cropped_im, (save_size, save_size),
                                        interpolation=cv2.INTER_LINEAR)
    
                # 划分种类, 选取60张neg人脸框用于后续网络的训练.
                if np.max(Iou) < 0.3 and neg_num < 60:
                    save_file = os.path.join(neg_dir, "%s.jpg" % n_idx)
                    neg_file.write(save_file + ' 0
    ')
                    cv2.imwrite(save_file, resized_im)
                    n_idx += 1
                    neg_num += 1
                else:
                    idx = np.argmax(Iou)  # 获取IOU最大的索引
                    assigned_gt = gts[idx]  # 得到IOU最大的人脸框
                    x1, y1, x2, y2 = assigned_gt
    
                    # 偏移量
                    offset_x1 = (x1 - x_left) / float(width)
                    offset_y1 = (y1 - y_top) / float(height)
                    offset_x2 = (x2 - x_right) / float(width)
                    offset_y2 = (y2 - y_bottom) / float(height)
    
                    # pos和part
                    if np.max(Iou) >= 0.65:
                        save_file = os.path.join(pos_dir, "%s.jpg" % p_idx)
                        pos_file.write(save_file + ' 1 %.2f %.2f %.2f %.2f
    ' % (
                            offset_x1, offset_y1, offset_x2, offset_y2))
                        cv2.imwrite(save_file, resized_im)
                        p_idx += 1
    
                    elif np.max(Iou) >= 0.4:
                        save_file = os.path.join(part_dir, "%s.jpg" % d_idx)
                        part_file.write(save_file + ' -1 %.2f %.2f %.2f %.2f
    ' % (
                            offset_x1, offset_y1, offset_x2, offset_y2))
                        cv2.imwrite(save_file, resized_im)
                        d_idx += 1
        neg_file.close()
        part_file.close()
        pos_file.close()
    
    
    def parse_arguments(argv):
    
        parser = argparse.ArgumentParser()
    
        parser.add_argument('input_size', type=int,
                            help='The input size for specific net')
        
        return parser.parse_args(argv)
    
    
    if __name__ == '__main__':
        main(parse_arguments(sys.argv[1:]))
    View Code

    训练过程如下:

    source activate tensorflow

    进入到preprocess目录下:
    python gen_12net_data.py生成三种pnet数据


    python gen_landmark_aug.py 12 生成pnet的landmark数据


    python gen_imglist_pnet.py整理到一起


    python gen_tfrecords.py 12生成tfrecords文件

    进入到train目录下:

    python train.py 12 训练pnet

    tensorboard显示loss:

    进入到preprocess目录:
    python gen_hard_example.py 12 生成三种rnet数据,
    python gen_landmark_aug.py 24 生成rnet的landmark数据,
    python gen_tfrecords.py 24生成tfrecords文件
    将目录cd到train上python train.py 24 训练rnet

    将目录cd到preprocess上,
    python gen_hard_example.py 24 生成三种onet数据,
    python gen_landmark_aug.py 48 生成onet的landmark数据,
    python gen_tfrecords.py 48生成tfrecords文件
    将目录cd到train上python train.py 48 训练onet

    测试文件:

    test.py

    # coding: utf-8
    
    import sys
    from detection.MtcnnDetector import MtcnnDetector
    from detection.detector import Detector
    from detection.fcn_detector import FcnDetector
    from train.model import P_Net, R_Net, O_Net
    import cv2
    import os
    import numpy as np
    import train.config as config
    
    
    test_mode = config.test_mode
    thresh = config.thresh
    min_face_size = config.min_face
    stride = config.stride
    detectors = [None, None, None]
    # 模型放置位置
    model_path = ['model/PNet/', 'model/RNet/', 'model/ONet']
    batch_size = config.batches
    PNet = FcnDetector(P_Net, model_path[0])
    detectors[0] = PNet
    
    
    if test_mode in ["RNet", "ONet"]:
        RNet = Detector(R_Net, 24, batch_size[1], model_path[1])
        detectors[1] = RNet
    
    
    if test_mode == "ONet":
        ONet = Detector(O_Net, 48, batch_size[2], model_path[2])
        detectors[2] = ONet
    
    mtcnn_detector = MtcnnDetector(detectors=detectors, min_face_size=min_face_size,
                                   stride=stride, threshold=thresh)
    out_path = config.out_path
    if config.input_mode == '1':
        # 选用图片
        path = config.test_dir
        # print(path)
        for item in os.listdir(path):
            img_path = os.path.join(path, item)
            img = cv2.imread(img_path)
            boxes_c, landmarks = mtcnn_detector.detect(img)
            for i in range(boxes_c.shape[0]):
                bbox = boxes_c[i, :4]
                score = boxes_c[i, 4]
                corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
                # 画人脸框
                cv2.rectangle(img, (corpbbox[0], corpbbox[1]),
                              (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
                # 判别为人脸的置信度
                cv2.putText(img, '{:.2f}'.format(score), 
                            (corpbbox[0], corpbbox[1] - 2),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
            # 画关键点
            for i in range(landmarks.shape[0]):
                for j in range(len(landmarks[i])//2):
                    cv2.circle(img, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
            cv2.imshow('im', img)
            k = cv2.waitKey(0) & 0xFF
            if k == 27:        
                cv2.imwrite(out_path + item, img)
        cv2.destroyAllWindows()
    
    if config.input_mode == '2':
        cap = cv2.VideoCapture(0)
        fourcc = cv2.VideoWriter_fourcc(*'XVID')
        out = cv2.VideoWriter(out_path+'out.mp4', fourcc, 10, (640, 480))
        while True:
                t1 = cv2.getTickCount()
                ret, frame = cap.read()
                if ret:
                    boxes_c, landmarks = mtcnn_detector.detect(frame)
                    t2 = cv2.getTickCount()
                    t = (t2-t1)/cv2.getTickFrequency()
                    fps = 1.0/t
                    for i in range(boxes_c.shape[0]):
                        bbox = boxes_c[i, :4]
                        score = boxes_c[i, 4]
                        corpbbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
                    
                        # 画人脸框
                        cv2.rectangle(frame, (corpbbox[0], corpbbox[1]),
                              (corpbbox[2], corpbbox[3]), (255, 0, 0), 1)
                        # 画置信度
                        cv2.putText(frame, '{:.2f}'.format(score), 
                                    (corpbbox[0], corpbbox[1] - 2), 
                                    cv2.FONT_HERSHEY_SIMPLEX,
                                    0.5, (0, 0, 255), 2)
                        # 画fps值
                    cv2.putText(frame, '{:.4f}'.format(t) + " " + '{:.3f}'.format(fps), (10, 20),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 2)
                    # 画关键点
                    for i in range(landmarks.shape[0]):
                        for j in range(len(landmarks[i])//2):
                            cv2.circle(frame, (int(landmarks[i][2*j]), int(int(landmarks[i][2*j+1]))), 2, (0, 0, 255))
                    a = out.write(frame)
                    cv2.imshow("result", frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                else:
                    break
        cap.release()
        out.release()
        cv2.destroyAllWindows()
    View Code

    其中使用到的模块:

    detector.py

    # coding: utf-8
    
    
    import tensorflow as tf
    import numpy as np
    
    
    class Detector:
        """
        识别多组图片
        """
        def __init__(self, net_factory, data_size, batch_size, model_path):
            graph = tf.Graph()
            with graph.as_default():
                self.image_op = tf.placeholder(tf.float32, [None, data_size, data_size, 3])
                self.cls_prob, self.bbox_pred, self.landmark_pred = net_factory(self.image_op, training=False)
                self.sess = tf.Session()
                # 重载模型
                saver = tf.train.Saver()
                model_file = tf.train.latest_checkpoint(model_path)
                saver.restore(self.sess, model_file)
            self.data_size = data_size
            self.batch_size = batch_size
    
        def predict(self, databatch):
            scores = []
            batch_size = self.batch_size
            minibatch = []
            cur = 0
            # 所有数据总数
            n = databatch.shape[0]
            # 将数据整理成固定batch
            while cur < n:
                minibatch.append(databatch[cur:min(cur+batch_size, n), :, :, :])
                cur += batch_size
            cls_prob_list = []
            bbox_pred_list = []
            landmark_pred_list = []
            for idx, data in enumerate(minibatch):
                m = data.shape[0]
                real_size = self.batch_size
                # 最后一组数据不够一个batch的处理
                if m < batch_size:
                    keep_inds = np.arange(m)
                    gap = self.batch_size-m
                    while gap >= len(keep_inds):
                        gap -= len(keep_inds)
                        keep_inds = np.concatenate((keep_inds, keep_inds))
                    if gap != 0:
                        keep_inds = np.concatenate((keep_inds, keep_inds[:gap]))
                    data = data[keep_inds]
                    real_size = m
                cls_prob, bbox_pred, landmark_pred = self.sess.run([self.cls_prob, self.bbox_pred, self.landmark_pred],
                                                                   feed_dict={self.image_op: data})
                
                cls_prob_list.append(cls_prob[:real_size])
                bbox_pred_list.append(bbox_pred[:real_size])
                landmark_pred_list.append(landmark_pred[:real_size])
            
            return np.concatenate(cls_prob_list, axis=0), np.concatenate(bbox_pred_list, axis=0), np.concatenate(landmark_pred_list, axis=0)
    View Code

    fcn_detector.py

    # coding: utf-8
    
    import tensorflow as tf
    import sys
    sys.path.append('../')
    import train.config as config
    
    
    class FcnDetector:
        """
        识别单张图片
        """
        def __init__(self, net_factory, model_path):
            graph = tf.Graph()
            with graph.as_default():
                self.image_op = tf.placeholder(tf.float32, name='input_image')
                self.width_op = tf.placeholder(tf.int32, name='image_width')
                self.height_op = tf.placeholder(tf.int32, name='image_height')
                image_reshape = tf.reshape(self.image_op, [1, self.height_op, self.width_op, 3])
                # 预测值
                self.cls_prob, self.bbox_pred, _ = net_factory(image_reshape, training=False)
                self.sess = tf.Session()
                # 重载模型
                saver = tf.train.Saver()
                model_file = tf.train.latest_checkpoint(model_path)
                saver.restore(self.sess, model_file)
    
        def predict(self, databatch):
            height, width, _ = databatch.shape
            cls_prob, bbox_pred = self.sess.run([self.cls_prob, self.bbox_pred],
                                                feed_dict={self.image_op: databatch,
                                                           self.width_op: width,
                                                           self.height_op: height})
            
            return cls_prob, bbox_pred
    View Code

    MtcnnDetector.py

    # coding: utf-8
    
    
    import cv2
    import numpy as np
    import sys
    
    sys.path.append('../')
    from preprocess.utils import *
    from tqdm import tqdm
    
    
    def py_nms(dets, thresh):
        """
        剔除太相似的box
        """
        x1 = dets[:, 0]
        y1 = dets[:, 1]
        x2 = dets[:, 2]
        y2 = dets[:, 3]
        scores = dets[:, 4]
    
        areas = (x2 - x1 + 1) * (y2 - y1 + 1)
        # 将概率值从大到小排列
        order = scores.argsort()[::-1]
    
        keep = []
        while order.size > 0:
            i = order[0]
            keep.append(i)
            xx1 = np.maximum(x1[i], x1[order[1:]])
            yy1 = np.maximum(y1[i], y1[order[1:]])
            xx2 = np.minimum(x2[i], x2[order[1:]])
            yy2 = np.minimum(y2[i], y2[order[1:]])
    
            w = np.maximum(0.0, xx2 - xx1 + 1)
            h = np.maximum(0.0, yy2 - yy1 + 1)
            inter = w * h
            
            ovr = inter / (areas[i] + areas[order[1:]] - inter+1e-10)
           
            # 保留小于阈值的下标,因为order[0]拿出来做比较了,所以inds+1是原来对应的下标
            inds = np.where(ovr <= thresh)[0]
            order = order[inds + 1]
    
        return keep
    
    
    class MtcnnDetector:
        """
        来生成人脸的图像
        """
        def __init__(self, detectors,
                     min_face_size=20,
                     stride=2,
                     threshold=[0.6, 0.7, 0.7],
                     scale_factor=0.79  # 图像金字塔的缩小率
                     ):
            self.pnet_detector = detectors[0]
            self.rnet_detector = detectors[1]
            self.onet_detector = detectors[2]
            self.min_face_size = min_face_size
            self.stride = stride
            self.thresh = threshold
            self.scale_factor = scale_factor
    
        def detect_face(self, test_data):
            all_boxes = []
            landmarks = []
            batch_idx = 0
            num_of_img = test_data.size
            empty_array = np.array([])
            for databatch in tqdm(test_data):
                batch_idx += 1
                im = databatch
                if self.pnet_detector:
                    boxes, boxes_c, landmark = self.detect_pnet(im)
                    if boxes_c is None:
                        all_boxes.append(empty_array)
                        landmarks.append(empty_array)
                        continue
                if self.rnet_detector:
                    boxes, boxes_c, landmark = self.detect_rnet(im, boxes_c)
                    
                    if boxes_c is None:
                        all_boxes.append(empty_array)
                        landmarks.append(empty_array)
                        continue
    
                if self.onet_detector:
                    
                    boxes, boxes_c, landmark = self.detect_onet(im, boxes_c)
                   
                    if boxes_c is None:
                        all_boxes.append(empty_array)
                        landmarks.append(empty_array)
                        continue
    
                all_boxes.append(boxes_c)
                landmark = [1]
                landmarks.append(landmark)
            return all_boxes, landmarks
    
        def detect_pnet(self, im):
            """
            通过PNet筛选box和landmark
            参数:
              im:输入图像[h,2,3]
            """
            h, w, c = im.shape
            net_size = 12
            # 人脸和输入图像的比率
            current_scale = float(net_size) / self.min_face_size
            im_resized = self.processed_image(im, current_scale)
            current_height, current_width, _ = im_resized.shape
            all_boxes = list()
            # 图像金字塔, 不断地去resize图片
            while min(current_height, current_width) > net_size:
                # 类别和box
                # 这里是测试流程, 输入是一张图片(size不一定是12*12)
                # 因此这里面输出得到的cls_cls_map形状是feature map(n * m * 2)
                # reg形状是是(n * m * 4)
                cls_cls_map, reg = self.pnet_detector.predict(im_resized)
                boxes = self.generate_bbox(cls_cls_map[:, :, 1], reg, current_scale, self.thresh[0])
                current_scale *= self.scale_factor  # 继续缩小图像做金字塔
                im_resized = self.processed_image(im, current_scale)
                current_height, current_width, _ = im_resized.shape
                
                if boxes.size == 0:
                    continue
                # 非极大值抑制留下重复低的box
                keep = py_nms(boxes[:, :5], 0.5)
                boxes = boxes[keep]
                all_boxes.append(boxes)
            if len(all_boxes) == 0:
                return None, None, None
            all_boxes = np.vstack(all_boxes)
    
            # 将金字塔之后的box也进行非极大值抑制
            keep = py_nms(all_boxes[:, 0:5], 0.7)
            all_boxes = all_boxes[keep]
            boxes = all_boxes[:, :5]
    
            # box的长宽
            bbw = all_boxes[:, 2] - all_boxes[:, 0] + 1
            bbh = all_boxes[:, 3] - all_boxes[:, 1] + 1
    
            # 对应原图的box坐标和分数, 训练数据是相对于人脸框bbox的归一化的offset, 因此这里dx、dy也都是归一化的.
            boxes_c = np.vstack([all_boxes[:, 0] + all_boxes[:, 5] * bbw,  # all_boxes[:, 5]--> dx1
                                 all_boxes[:, 1] + all_boxes[:, 6] * bbh,  # all_boxes[:, 6]--> dy1
                                 all_boxes[:, 2] + all_boxes[:, 7] * bbw,  # all_boxes[:, 7]--> dx2
                                 all_boxes[:, 3] + all_boxes[:, 8] * bbh,  # all_boxes[:, 8]--> dy2
                                 all_boxes[:, 4]])
            boxes_c = boxes_c.T
            return boxes, boxes_c, None
    
        def detect_rnet(self, im, dets):
            """
            通过rent选择box
            参数:
              im:输入图像
              dets: PNet选择的box,是相对原图的绝对坐标
            返回值:
              box绝对坐标
            """
            h, w, c = im.shape
            # 将PNet的box变成包含它的正方形,可以避免信息损失
            dets = convert_to_square(dets)
            dets[:, 0:4] = np.round(dets[:, 0:4])
            # 调整超出图像的box
            [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
            delete_size = np.ones_like(tmpw)*20
            ones = np.ones_like(tmpw)
            zeros = np.zeros_like(tmpw)
            num_boxes = np.sum(np.where((np.minimum(tmpw, tmph) >= delete_size), ones, zeros))
            cropped_ims = np.zeros((num_boxes, 24, 24, 3), dtype=np.float32)
            for i in range(num_boxes):
                # 将PNet生成的box相对与原图进行裁剪, 超出部分用0补
                if tmph[i] < 20 or tmpw[i] < 20:
                    continue
                tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
                tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
                cropped_ims[i, :, :, :] = (cv2.resize(tmp, (24, 24)) - 127.5) / 128
            cls_scores, reg, _ = self.rnet_detector.predict(cropped_ims)
            cls_scores = cls_scores[:, 1]
            keep_inds = np.where(cls_scores > self.thresh[1])[0]
            if len(keep_inds) > 0:
                boxes = dets[keep_inds]
                boxes[:, 4] = cls_scores[keep_inds]
                reg = reg[keep_inds]
            else:
                return None, None, None
    
            keep = py_nms(boxes, 0.6)
            boxes = boxes[keep]
            # 对PNet截取的图像的坐标进行校准,生成RNet的人脸框对于原图的绝对坐标
            boxes_c = self.calibrate_box(boxes, reg[keep])
            return boxes, boxes_c, None
        
        def detect_onet(self, im, dets):
            """
            将ONet的选框继续筛选基本和RNet差不多但多返回了landmark
            """
            h, w, c = im.shape
            dets = convert_to_square(dets)
            dets[:, 0:4] = np.round(dets[:, 0:4])
            [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph] = self.pad(dets, w, h)
            num_boxes = dets.shape[0]
            cropped_ims = np.zeros((num_boxes, 48, 48, 3), dtype=np.float32)
            for i in range(num_boxes):
                tmp = np.zeros((tmph[i], tmpw[i], 3), dtype=np.uint8)
                tmp[dy[i]:edy[i] + 1, dx[i]:edx[i] + 1, :] = im[y[i]:ey[i] + 1, x[i]:ex[i] + 1, :]
                cropped_ims[i, :, :, :] = (cv2.resize(tmp, (48, 48)) - 127.5) / 128
    
            cls_scores, reg, landmark = self.onet_detector.predict(cropped_ims)
            
            cls_scores = cls_scores[:, 1]
            keep_inds = np.where(cls_scores > self.thresh[2])[0]
            if len(keep_inds) > 0:
                boxes = dets[keep_inds]
                boxes[:, 4] = cls_scores[keep_inds]
                reg = reg[keep_inds]
                landmark = landmark[keep_inds]
            else:
                return None, None, None
    
            w = boxes[:, 2] - boxes[:, 0] + 1
            h = boxes[:, 3] - boxes[:, 1] + 1
            landmark[:, 0::2] = (np.tile(w, (5, 1)) * landmark[:, 0::2].T + np.tile(boxes[:, 0], (5, 1)) - 1).T
            landmark[:, 1::2] = (np.tile(h, (5, 1)) * landmark[:, 1::2].T + np.tile(boxes[:, 1], (5, 1)) - 1).T
            boxes_c = self.calibrate_box(boxes, reg)
    
            boxes = boxes[py_nms(boxes, 0.6)]
            keep = py_nms(boxes_c, 0.6)
            boxes_c = boxes_c[keep]
            landmark = landmark[keep]
            return boxes, boxes_c, landmark
    
        def processed_image(self, img, scale):
            """
            预处理数据,转化图像尺度并对像素归一到[-1, 1]
            """
            height, width, channels = img.shape
            new_height = int(height * scale)  
            new_width = int(width * scale)  
            new_dim = (new_width, new_height)
            img_resized = cv2.resize(img, new_dim, interpolation=cv2.INTER_LINEAR) 
            img_resized = (img_resized - 127.5) / 128
            return img_resized
    
        def generate_bbox(self, cls_map, reg, scale, threshold):
            """
             得到对应原图的box坐标,分类分数,box偏移量
             cls_map: n * m(输入是cls_cls_map[:, :, 1], 第一维, 人脸框的概率.)
             reg: n * m * 4
            """
    
            # pnet大致将图像size缩小2倍
            stride = 2
    
            cellsize = 12
    
            # 将置信度高的留下, 即为预测的人脸框. 二维的.
            t_index = np.where(cls_map > threshold)
    
            # 没有人脸, 这里也可以是t_index[1].size
            # 使用np.where(二维数组), 得到包括两个元素的列表, 第一个元素是第一维的坐标, 第二个元素是第二维的坐标.
            if t_index[0].size == 0:
                return np.array([])
            # 偏移量
            dx1, dy1, dx2, dy2 = [reg[t_index[0], t_index[1], i] for i in range(4)]
    
            reg = np.array([dx1, dy1, dx2, dy2])
            score = cls_map[t_index[0], t_index[1]]
            # 对应原图的box坐标,分类分数,box偏移量
            # 原始图片中回归框坐标需要经过反向运算,计算方式如下,其中cellSize=12,是因为12*12的图片进去后变成1*1
            # stride=2是因为几层卷积中只有一个stride为2,scale代表的是我们在哪个尺度金字塔的图像,
            boundingbox = np.vstack([np.round((stride * t_index[1]) / scale),
                                     np.round((stride * t_index[0]) / scale),
                                     np.round((stride * t_index[1] + cellsize) / scale),
                                     np.round((stride * t_index[0] + cellsize) / scale),
                                     score,
                                     reg])
            # shape[n,9]
            return boundingbox.T
    
        def pad(self, bboxes, w, h):
            """
            将超出图像的box进行处理
            参数:
              bboxes: 人脸框
              w, h: 图像长宽
            返回值:
              dy, dx : 为调整后的box的左上角坐标相对于原box左上角的坐标
              edy, edx : n为调整后的box右下角相对原box左上角的相对坐标
              y, x : 调整后的box在原图上左上角的坐标
              ey, ex : 调整后的box在原图上右下角的坐标
              tmph, tmpw: 原始box的长宽
            """
            # box的长宽
            tmpw, tmph = bboxes[:, 2] - bboxes[:, 0] + 1, bboxes[:, 3] - bboxes[:, 1] + 1
            num_box = bboxes.shape[0]
    
            dx, dy = np.zeros((num_box, )), np.zeros((num_box, ))
            edx, edy = tmpw.copy() - 1, tmph.copy() - 1
            # box左上右下的坐标
            x, y, ex, ey = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
            # 找到超出右下边界的box并将ex, ey归为图像的w, h
            # edx, edy为调整后的box右下角相对原box左上角的相对坐标
            tmp_index = np.where(ex > w - 1)
            # w -1 + tmpw -1 - edx= ex -> edx = w + tmpw - ex - 2
            edx[tmp_index] = tmpw[tmp_index] + w - 2 - ex[tmp_index]
            ex[tmp_index] = w - 1
    
            tmp_index = np.where(ey > h - 1)
            # h -1 + tmph -1 - edy = ey -> edy = h + tmph - ey - 2
            edy[tmp_index] = tmph[tmp_index] + h - 2 - ey[tmp_index]
            ey[tmp_index] = h - 1
            # 找到超出左上角的box并将x,y归为0
            # dx, dy为调整后的box的左上角坐标相对于原box左上角的坐标
            tmp_index = np.where(x < 0)
            dx[tmp_index] = 0 - x[tmp_index]
            x[tmp_index] = 0
    
            tmp_index = np.where(y < 0)
            dy[tmp_index] = 0 - y[tmp_index]
            y[tmp_index] = 0
    
            return_list = [dy, edy, dx, edx, y, ey, x, ex, tmpw, tmph]
            return_list = [item.astype(np.int32) for item in return_list]
    
            return return_list
    
        def calibrate_box(self, bbox, reg):
            """
            校准box
            参数:
              bbox: PNet生成的box
              reg: RNet生成的box偏移值
            返回值:
              调整后的box是针对原图的绝对坐标
            """
            bbox_c = bbox.copy()
            w = bbox[:, 2] - bbox[:, 0] + 1
            w = np.expand_dims(w, 1)
            h = bbox[:, 3] - bbox[:, 1] + 1
            h = np.expand_dims(h, 1)
            reg_m = np.hstack([w, h, w, h])
            aug = reg_m * reg
            bbox_c[:, 0:4] = bbox_c[:, 0:4] + aug
            return bbox_c
    
        def detect(self, img):
            """
            用于测试当个图像的
            """
            boxes = None
    
            # PNet
            if self.pnet_detector:
                boxes, boxes_c, _ = self.detect_pnet(img)
                if boxes_c is None:
                    return np.array([]), np.array([])
    
            # RNet
            if self.rnet_detector:
                boxes, boxes_c, _ = self.detect_rnet(img, boxes_c)
                if boxes_c is None:
                    return np.array([]), np.array([])
    
            # ONet
            if self.onet_detector:
                boxes, boxes_c, landmark = self.detect_onet(img, boxes_c)
                if boxes_c is None:
                    return np.array([]), np.array([])
    
            return boxes_c, landmark
    View Code

    测试验证过程:

    python test.py

    结果:

     

    图片数据来源网络,仅供学习使用,如有侵权,请联系删除,谢谢!

    参考:https://www.ctolib.com/LeslieZhoa-tensorflow-MTCNN.html

  • 相关阅读:
    Linux 4.11 内核变化
    c++设计模式
    【MySQL】undo,redo,2PC,恢复思维导图
    10053
    深入理解MySQL中的Redo、Undo、MVCC
    oracle 博客精选
    Linux内存中的Cache真的能被回收么?
    MySQL性能指标及计算方法 等待show processlist
    HTTP抓包工具Fiddler
    科来网络分析
  • 原文地址:https://www.cnblogs.com/xjlearningAI/p/12391312.html
Copyright © 2011-2022 走看看