前言

本文章我們來學習一下使用PaddlePaddle實現人臉對比和人臉識別,使用的訓練數據集是CASIA-WebFace。

數據集介紹

我們使用的是CASIA-WebFace數據集,該人臉數據集是目前最大的公開人臉數據集。該人臉數據集一共有包含10,575個人,494,414張圖像,包含彩色圖和灰圖。各大人臉數據集情況如下表。

Dataset Subjects Images Availability
LFW [1] 5,749 13,233 Public
WDRef [2] 2,995 99,773 Public (feature only)
CelebFaces [3] 10,177 202,599 Private
SFC [4] 4,030 4,400,000 Private
CACD [5] 2,000 163,446 Public (partial annotated)
CASIA-WebFace 10,575 494,414 Public

訓練模型

爲了方便讀取數據集,我們要生成一個圖像列表,用於訓練時讀取數據,這個列表的作用具體可以閱讀筆者之前的文章《我的PaddlePaddle學習之路》筆記四——自定義圖像數據集的識別,執行下面代碼生成人臉數據集的圖像列表。下載CASIA-WebFace數據集並解壓,執行代碼時傳入解壓後的根目錄,執行之後會在/home/test生成一個圖像列表文件夾。

# 生成圖像列表程序
import os
import json

class CreateDataList:
    def __init__(self):
        pass

    def createTrainDataList(self, data_root_path):
        # # 把生產的數據列表都放在自己的總類別文件夾中
        data_list_path = ''
        # 所有類別的信息
        class_detail = []
        # 獲取所有類別
        class_dirs = os.listdir(data_root_path)
        # 類別標籤
        class_label = 0
        # 獲取總類別的名稱
        father_paths = data_root_path.split('/')
        while True:
            if father_paths[father_paths.__len__() - 1] == '':
                del father_paths[father_paths.__len__() - 1]
            else:
                break
        father_path = father_paths[father_paths.__len__() - 1]

        all_class_images = 0
        # 讀取每個類別
        for class_dir in class_dirs:
            # 每個類別的信息
            class_detail_list = {}
            test_sum = 0
            trainer_sum = 0
            # 把生產的數據列表都放在自己的總類別文件夾中
            data_list_path = "/home/test/%s/" % father_path
            # 統計每個類別有多少張圖片
            class_sum = 0
            # 獲取類別路徑
            path = data_root_path + "/" + class_dir
            # 獲取所有圖片
            img_paths = os.listdir(path)
            for img_path in img_paths:
                # 每張圖片的路徑
                name_path = path + '/' + img_path
                # 如果不存在這個文件夾,就創建
                isexist = os.path.exists(data_list_path)
                if not isexist:
                    os.makedirs(data_list_path)
                # 每10張圖片取一個做測試數據
                trainer_sum += 1
                with open(data_list_path + "trainer.list", 'a') as f:
                    f.write(name_path + "\t%d" % class_label + "\n")

                class_sum += 1
                all_class_images += 1
            class_label += 1
            # 說明的json文件的class_detail數據
            class_detail_list['class_name'] = class_dir
            class_detail_list['class_label'] = class_label
            class_detail_list['class_test_images'] = test_sum
            class_detail_list['class_trainer_images'] = trainer_sum
            class_detail.append(class_detail_list)
        # 獲取類別數量
        all_class_sum = class_dirs.__len__()
        # 說明的json文件信息
        readjson = {}
        readjson['all_class_name'] = father_path
        readjson['all_class_sum'] = all_class_sum
        readjson['all_class_images'] = all_class_images
        readjson['class_detail'] = class_detail
        jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
        with open(data_list_path + "readme.json",'w') as f:
            f.write(jsons)

if __name__ == '__main__':
    createDataList = CreateDataList()
    createDataList.createTrainDataList('/home/test/WebFace/')

編寫讀取圖像的reader,這個reader對圖像做的預處理的進行中心裁剪,因爲人臉都是居中的,進行居中裁剪可以去掉其他的背景的影響。

# 把圖像和label讀取成reader
# coding=utf-8
import cv2
import numpy as np
import paddle.v2 as paddle
import random
from multiprocessing import cpu_count

class MyReader:
    def __init__(self, imageSize, type_size, center_crop_size = 128):
        self.imageSize = imageSize
        self.type_size = type_size
        self.center_crop_size = center_crop_size
        self.default_image_size = 250

    def train_mapper(self, sample):
        img, label = sample
        sparse_label = [0 for i in range(self.type_size)]
        sparse_label[label - 1] = 1
        # 裁剪中心圖片
        def crop_img(img, center_crop_size):
            img = cv2.imread(img, 0)
            if center_crop_size < self.default_image_size:
                side = (self.default_image_size - center_crop_size) / 2
                img = img[side: self.default_image_size - side - 1, side: self.default_image_size - side - 1]
            return img

        img = crop_img(img, self.center_crop_size)
        img = cv2.resize(img, (self.imageSize, self.imageSize))
        return img.flatten().astype('float32'), label, sparse_label
    # 獲取訓練的reader
    def train_reader(self, train_list, buffered_size=1024):
        def reader():
            with open(train_list, 'r') as f:
                lines = [line.strip() for line in f]
                # 打亂數據
                random.shuffle(lines)
                for line in lines:
                    line = line.strip().split('\t')
                    img_path = line[0]
                    img_label = line[1]

                    yield img_path, int(img_label)

        return paddle.reader.xmap_readers(self.train_mapper, reader, cpu_count(), buffered_size)

編寫卷積神經網絡,這個是根據resnet修改的網絡。使用了6個卷積塊,最後的返回值是最後一個池化層和最後一個全連接層,輸出最後一層池化層是爲了在預測的是獲取圖像的人臉特徵,做人臉對比。

import numpy as np
import paddle.v2 as paddle


def conv_bn_layer(input,
                  ch_out,
                  filter_size,
                  stride,
                  padding,
                  active_type=paddle.activation.Relu(),
                  ch_in=None):
    tmp = paddle.layer.img_conv(
        input=input,
        filter_size=filter_size,
        num_channels=ch_in,
        num_filters=ch_out,
        stride=stride,
        padding=padding,
        act=paddle.activation.Linear(),
        bias_attr=False)
    return paddle.layer.batch_norm(input=tmp, act=active_type, moving_average_fraction=0.999)


def shortcut(ipt, ch_in, ch_out, stride):
    if ch_in != ch_out:
        return conv_bn_layer(ipt, ch_out, 1, stride, 0, paddle.activation.Linear())
    else:
        return ipt

def basicblock(ipt, ch_in, ch_out, stride):
    tmp = conv_bn_layer(ipt, ch_out, 3, stride, 1)
    tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, paddle.activation.Linear())
    short = shortcut(ipt, ch_in, ch_out, stride)
    return paddle.layer.addto(input=[tmp, short], act=paddle.activation.Relu())


def layer_warp(block_func, ipt, ch_in, ch_out, count, stride):
    tmp = block_func(ipt, ch_in, ch_out, stride)
    for i in range(1, count):
        tmp = block_func(tmp, ch_out, ch_out, 1)
    return tmp


def resnet(ipt, class_dim):
    n = 1
    feature_maps = 512
    ipt_bn = ipt - 128.0
    # 獲取卷積層輸出
    conv1 = conv_bn_layer(ipt_bn, ch_in=1, ch_out=8, filter_size=3, stride=1, padding=1)
    # 多個殘差塊組合
    res0 = layer_warp(basicblock, conv1, 8, 16, n, 1)
    res1 = layer_warp(basicblock, res0, 16, 32, n, 1)
    res2 = layer_warp(basicblock, res1, 32, 64, n, 2)
    res3 = layer_warp(basicblock, res2, 64, 128, n, 2)
    res4 = layer_warp(basicblock, res3, 128, 256, n, 2)
    res5 = layer_warp(basicblock, res4, 256, feature_maps, n, 2)
    # 最後使用池化層來降維
    pool = paddle.layer.img_pool(input=res5, name='pool', pool_size=8, stride=1, pool_type=paddle.pooling.Avg())

    fc = paddle.layer.fc(input=pool, size=class_dim, act=paddle.activation.Softmax())
    return pool, fc

開始訓練模型

# 訓練代碼
import os
import sys
import paddle.v2 as paddle
from paddle.v2.plot import Ploter

step = 0

class PaddleUtil:

    # **********************獲取參數***************************************
    def get_parameters(self, parameters_path=None, cost=None):
        if not parameters_path:
            # 使用cost創建parameters
            if not cost:
                raise NameError('請輸入cost參數')
            else:
                # 根據損失函數創建參數
                parameters = paddle.parameters.create(cost)
                print "cost"
                return parameters
        else:
            # 使用之前訓練好的參數
            try:
                # 使用訓練好的參數
                with open(parameters_path, 'r') as f:
                    parameters = paddle.parameters.Parameters.from_tar(f)
                print "使用parameters"
                return parameters
            except Exception as e:
                raise NameError("你的參數文件錯誤,具體問題是:%s" % e)

    # ***********************獲取訓練器***************************************
    # datadim 數據大小
    def get_trainer(self, datadim, type_size, parameters_path, batch_size):
        # 獲得圖片對於的信息標籤
        label = paddle.layer.data(name="label", type=paddle.data_type.integer_value(type_size))
        image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))

        # 獲取全連接層,也就是分類器
        fea, out = resnet(image, class_dim=type_size)
        # 獲得損失函數
        cost = paddle.layer.classification_cost(input=out, label=label)

        # 獲得參數
        if not parameters_path:
            parameters = self.get_parameters(cost=cost)
        else:
            parameters = self.get_parameters(parameters_path=parameters_path)

        '''
        定義優化方法
        learning_rate 迭代的速度
        momentum 跟前面動量優化的比例
        regularzation 正則化,防止過擬合
        '''
        optimizer = paddle.optimizer.Momentum(
            momentum=0.9,
            regularization=paddle.optimizer.L2Regularization(rate=0.0005 * batch_size),
            learning_rate=0.00001 / batch_size,
            learning_rate_decay_a=0.1,
            learning_rate_decay_b=128000 * 35,
            learning_rate_schedule="discexp", )


        '''
        創建訓練器
        cost 分類器
        parameters 訓練參數,可以通過創建,也可以使用之前訓練好的參數
        update_equation 優化方法
        '''
        trainer = paddle.trainer.SGD(cost=cost,
                                     parameters=parameters,
                                     update_equation=optimizer)
        return trainer

    # ***********************開始訓練***************************************
    def start_trainer(self, trainer, num_passes, save_parameters_name, trainer_reader, batch_size):
        # 獲得數據
        reader = paddle.batch(reader=paddle.reader.shuffle(reader=trainer_reader,
                                                           buf_size=5000),
                              batch_size=batch_size)
        # 保證保存模型的目錄是存在的
        father_path = save_parameters_name[:save_parameters_name.rfind("/")]
        if not os.path.exists(father_path):
            os.makedirs(father_path)

        # 指定每條數據和padd.layer.data的對應關係
        feeding = {"image": 0, "label": 1}

        train_title = "Train cost"
        error_title = "Error"
        cost_ploter = Ploter(train_title, error_title)

        # 定義訓練事件,畫出折線圖,該事件的圖可以在notebook上顯示,命令行不會正常輸出
        def event_handler_plot(event):
            global step
            if isinstance(event, paddle.event.EndIteration):
                if step % 1 == 0:
                    cost_ploter.append(train_title, step, event.cost)
                    # cost_ploter.append(error_title, step, event.metrics['classification_error_evaluator'])
                    cost_ploter.plot()
                step += 1
                if step % 100 == 0:
                    # 保存訓練好的參數
                    with open(save_parameters_name, 'w') as f:
                        trainer.save_parameter_to_tar(f)

        '''
        開始訓練
        reader 訓練數據
        num_passes 訓練的輪數
        event_handler 訓練的事件,比如在訓練的時候要做一些什麼事情
        feeding 說明每條數據和padd.layer.data的對應關係
        '''
        trainer.train(reader=reader,
                      num_passes=num_passes,
                      event_handler=event_handler_plot,
                      feeding=feeding)



if __name__ == '__main__':
    paddle.init(use_gpu=True, trainer_count=1)
    # 類別總數
    type_size = 10575
    # 圖片大小
    imageSize = 128
    # 中心裁剪大小
    crop_size = 128
    # Batch Size
    batch_size = 256
    # 保存的model路徑
    parameters_path = "/home/test/model.tar"
    # 數據的大小
    datadim = imageSize * imageSize
    paddleUtil = PaddleUtil()

    # *******************************開始訓練**************************************
    myReader = MyReader(imageSize=imageSize, type_size=type_size, center_crop_size=crop_size)
    trainer_reader = myReader.train_reader(train_list="/home/test/train_set/trainer.list")
    # 獲取訓練器
    trainer = paddleUtil.get_trainer(datadim=datadim, type_size=type_size, parameters_path=None, batch_size=batch_size)

    paddleUtil.start_trainer(trainer=trainer, num_passes=50, save_parameters_name=parameters_path,
                             trainer_reader=trainer_reader, batch_size=batch_size)

預測

經過上面的訓練之後,獲得得到了一個訓練好的模型,我們將會使用這個模型來進行人臉對比和人臉識別。

人臉對比

人臉對比,人臉對比其實就是做普通的分類預測,但是輸出的不是最後一層全連接層,而是最後一層池化層,這樣輸出的就是人臉的特徵,然後使用對角餘弦函數來計算他們的相似度。

  • 通過人臉對比的方式實現一些場景的應用。比如對比證件上的人臉和真實的人臉是否爲同一個人,操作方式判斷人臉相似度的result是否達到預設值,推薦相似度爲0.8時,爲同一個人。
  • 利用這種的人臉對比方式,有可以實現人臉識別。
  • 首先我們可以把人臉以註冊人臉的方式加入到註冊人臉庫中,加關聯到該人臉的信息;
  • 然後要進行識別時,把要識別的人臉和已註冊的人臉庫中的人臉進行對比,當對比爲識別爲同一個人臉,就算識別成功
  • 這樣的處理方式好處是,不需要每次增加新的用戶時,需要收集大量該用戶的人臉,只有收集一張或者多張多角度的人臉,完全可以使用同一個模型進行人臉對比。
# 預測代碼
import numpy as np
import paddle.v2 as paddle
import os
import cv2
import math
from sklearn import preprocessing


# 獲取參數
def get_parameters(parameters_path):
    with open(parameters_path, 'r') as f:
        parameters = paddle.parameters.Parameters.from_tar(f)
    return parameters

# 獲取預測器
def get_inference(parameters, fea):
    inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
    return inferer

# 預處理圖片
def load_image(file, imageSize):
        img = cv2.imread(file, 0)
        img = np.reshape(img, [img.shape[0], img.shape[1], 1])
        img = paddle.image.center_crop(img, 128, is_color=False)
        img = cv2.resize(img, (imageSize, imageSize)).flatten()
        return img

# 使用訓練好的參數進行預測
def to_prediction(inferer, image_paths, imageSize):
    # 獲得要預測的圖片
    test_data = []
    for image_path in image_paths:
        test_data.append([load_image(image_path, imageSize)])

    # 獲得預測結果
    probs = inferer.infer(input=test_data)
    # 獲取兩個圖片的預測輸出
    prob1 = probs[0]
    prob2 = probs[1]

    # 對角餘弦值
    dist = np.dot(prob1, prob2) / (np.linalg.norm(prob1) * np.linalg.norm(prob2))

    return dist

if __name__ == '__main__':
    paddle.init(use_gpu=True, trainer_count=1)
    # 類別總數
    type_size = 10575
    # 圖片大小
    imageSize = 128
    # 保存的model路徑
    parameters_path = "/home/test/model.tar"
    # 數據的大小
    datadim = imageSize * imageSize
    # 獲取預測器
    parameters = get_parameters(parameters_path=parameters_path)
    image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
    fea, out = resnet(image, class_dim=type_size)
    inferer = get_inference(parameters=parameters, fea=fea)

    image_path = []
    image_path1, image_path2 = "/home/test/0.jpg", "/home/test/1.jpg"
    image_path.append(image_path1)
    image_path.append(image_path2)

    # 得到兩張圖的相似度
    result = to_prediction(inferer=inferer, image_paths=image_path, imageSize=imageSize)
    print("兩張圖像的相似度爲:" + result )

人臉識別

這個是人臉識別方式是不推薦使用的,它就是一個分類的操作,輸入一張人臉圖片,獲取對應的人臉的label和概率。
但是如果要加入新的人臉,需要收集大量該用戶的人臉,並再次進行訓練,得到新的模型。
這樣的識別方式,擴展性非常弱,但是識別速度比較快,不需要每張人臉都進行對比。

# 預測代碼
import cv2
import numpy as np
import paddle.v2 as paddle


# 獲取參數
def get_parameters(parameters_path):
    with open(parameters_path, 'r') as f:
        parameters = paddle.parameters.Parameters.from_tar(f)
    return parameters


# 獲取預測器
def get_inference(parameters, fea):
    inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
    return inferer


# 預處理圖片
def load_image(file, imageSize):
    img = cv2.imread(file, 0)
    img = np.reshape(img, [img.shape[0], img.shape[1], 1])
    img = paddle.image.center_crop(img, 128, is_color=False)
    img = cv2.resize(img, (imageSize, imageSize)).flatten()
    return img


# 使用訓練好的參數進行預測
def to_prediction(inferer, image_paths, imageSize):
    # 獲得要預測的圖片
    test_data = []
    test_data.append([load_image(image_path, imageSize)])

    # 獲得預測結果
    probs = inferer.infer(input=test_data)
    # 處理預測結果
    lab = np.argsort(-probs)
    # 返回概率最大的值和其對應的概率值
    return lab[0][0], probs[0][(lab[0][0])]


if __name__ == '__main__':
    paddle.init(use_gpu=True, trainer_count=1)
    # 類別總數
    type_size = 10575
    # 圖片大小
    imageSize = 128
    # 保存的model路徑
    parameters_path = "/home/test/model.tar"
    # 數據的大小
    datadim = imageSize * imageSize
    # 獲取預測器
    parameters = get_parameters(parameters_path=parameters_path)
    image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
    fea, out = resnet(image, class_dim=type_size)
    inferer = get_inference(parameters=parameters, fea=out)

    image_path = "/home/test/0.jpg"

    # 獲取人臉對比的label和概率
    result, probability = to_prediction(inferer=inferer, image_paths=image_path, imageSize=imageSize)
    print('預測結果爲:%d,可信度爲:%f' % (result, probability))
小夜