Introduction

In this article, we will learn how to implement face comparison and face recognition using PaddlePaddle, with the CASIA-WebFace dataset as the training data.

Dataset Introduction

We use the CASIA-WebFace dataset, which is currently the largest publicly available face dataset. It contains 10,575 individuals and 494,414 images, including both color and grayscale images. The comparison of major face datasets is shown in the table below:

Dataset Subjects Images Availability
LFW [1] 5,749 13,233 Public
WDRef [2] 2,995 99,773 Public (feature only)
CelebFaces [3] 10,177 202,599 Private
SFC [4] 4,030 4,400,000 Private
CACD [5] 2,000 163,446 Public (partial annotated)
CASIA-WebFace 10,575 494,414 Public

Training Model

To facilitate dataset reading, we need to generate an image list for training data reading. For details on the role of this list, please refer to the author’s previous article: “My PaddlePaddle Learning Notes IV - Custom Image Dataset Recognition”. Execute the following code to generate the image list for the face dataset. After downloading and extracting the CASIA-WebFace dataset, run the code with the extracted root directory. It will generate an image list folder in /home/test.

# Code to generate image list
import os
import json

class CreateDataList:
    def __init__(self):
        pass

    def createTrainDataList(self, data_root_path):
        data_list_path = ''
        class_detail = []
        class_dirs = os.listdir(data_root_path)
        class_label = 0
        father_paths = data_root_path.split('/')
        while True:
            if father_paths[father_paths.__len__() - 1] == '':
                del father_paths[father_paths.__len__() - 1]
            else:
                break
        father_path = father_paths[father_paths.__len__() - 1]

        all_class_images = 0
        for class_dir in class_dirs:
            class_detail_list = {}
            test_sum = 0
            trainer_sum = 0
            data_list_path = "/home/test/%s/" % father_path
            class_sum = 0
            path = data_root_path + "/" + class_dir
            img_paths = os.listdir(path)
            for img_path in img_paths:
                name_path = path + '/' + img_path
                isexist = os.path.exists(data_list_path)
                if not isexist:
                    os.makedirs(data_list_path)
                trainer_sum += 1
                with open(data_list_path + "trainer.list", 'a') as f:
                    f.write(name_path + "\t%d" % class_label + "\n")
                class_sum += 1
                all_class_images += 1
            class_label += 1
            class_detail_list['class_name'] = class_dir
            class_detail_list['class_label'] = class_label
            class_detail_list['class_test_images'] = test_sum
            class_detail_list['class_trainer_images'] = trainer_sum
            class_detail.append(class_detail_list)
        all_class_sum = class_dirs.__len__()
        readjson = {}
        readjson['all_class_name'] = father_path
        readjson['all_class_sum'] = all_class_sum
        readjson['all_class_images'] = all_class_images
        readjson['class_detail'] = class_detail
        jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
        with open(data_list_path + "readme.json",'w') as f:
            f.write(jsons)

if __name__ == '__main__':
    createDataList = CreateDataList()
    createDataList.createTrainDataList('/home/test/WebFace/')

Write a reader to read images. This reader performs center cropping on images, which removes background interference since faces are centered:

# Reader for reading images and labels
# coding=utf-8
import cv2
import numpy as np
import paddle.v2 as paddle
import random
from multiprocessing import cpu_count

class MyReader:
    def __init__(self, imageSize, type_size, center_crop_size = 128):
        self.imageSize = imageSize
        self.type_size = type_size
        self.center_crop_size = center_crop_size
        self.default_image_size = 250

    def train_mapper(self, sample):
        img, label = sample
        sparse_label = [0 for i in range(self.type_size)]
        sparse_label[label - 1] = 1
        def crop_img(img, center_crop_size):
            img = cv2.imread(img, 0)
            if center_crop_size < self.default_image_size:
                side = (self.default_image_size - center_crop_size) / 2
                img = img[side: self.default_image_size - side - 1, side: self.default_image_size - side - 1]
            return img

        img = crop_img(img, self.center_crop_size)
        img = cv2.resize(img, (self.imageSize, self.imageSize))
        return img.flatten().astype('float32'), label, sparse_label

    def train_reader(self, train_list, buffered_size=1024):
        def reader():
            with open(train_list, 'r') as f:
                lines = [line.strip() for line in f]
                random.shuffle(lines)
                for line in lines:
                    line = line.strip().split('\t')
                    img_path = line[0]
                    img_label = line[1]
                    yield img_path, int(img_label)
        return paddle.reader.xmap_readers(self.train_mapper, reader, cpu_count(), buffered_size)

Define a convolutional neural network modified from ResNet. It uses 6 convolutional blocks, with the output of the last pooling layer for extracting face features during prediction (for face comparison):

import numpy as np
import paddle.v2 as paddle

def conv_bn_layer(input,
                  ch_out,
                  filter_size,
                  stride,
                  padding,
                  active_type=paddle.activation.Relu(),
                  ch_in=None):
    tmp = paddle.layer.img_conv(
        input=input,
        filter_size=filter_size,
        num_channels=ch_in,
        num_filters=ch_out,
        stride=stride,
        padding=padding,
        act=paddle.activation.Linear(),
        bias_attr=False)
    return paddle.layer.batch_norm(input=tmp, act=active_type, moving_average_fraction=0.999)

def shortcut(ipt, ch_in, ch_out, stride):
    if ch_in != ch_out:
        return conv_bn_layer(ipt, ch_out, 1, stride, 0, paddle.activation.Linear())
    else:
        return ipt

def basicblock(ipt, ch_in, ch_out, stride):
    tmp = conv_bn_layer(ipt, ch_out, 3, stride, 1)
    tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, paddle.activation.Linear())
    short = shortcut(ipt, ch_in, ch_out, stride)
    return paddle.layer.addto(input=[tmp, short], act=paddle.activation.Relu())

def layer_warp(block_func, ipt, ch_in, ch_out, count, stride):
    tmp = block_func(ipt, ch_in, ch_out, stride)
    for i in range(1, count):
        tmp = block_func(tmp, ch_out, ch_out, 1)
    return tmp

def resnet(ipt, class_dim):
    n = 1
    feature_maps = 512
    ipt_bn = ipt - 128.0
    conv1 = conv_bn_layer(ipt_bn, ch_in=1, ch_out=8, filter_size=3, stride=1, padding=1)
    res0 = layer_warp(basicblock, conv1, 8, 16, n, 1)
    res1 = layer_warp(basicblock, res0, 16, 32, n, 1)
    res2 = layer_warp(basicblock, res1, 32, 64, n, 2)
    res3 = layer_warp(basicblock, res2, 64, 128, n, 2)
    res4 = layer_warp(basicblock, res3, 128, 256, n, 2)
    res5 = layer_warp(basicblock, res4, 256, feature_maps, n, 2)
    pool = paddle.layer.img_pool(input=res5, name='pool', pool_size=8, stride=1, pool_type=paddle.pooling.Avg())
    fc = paddle.layer.fc(input=pool, size=class_dim, act=paddle.activation.Softmax())
    return pool, fc

Start training the model:

# Training code
import os
import sys
import paddle.v2 as paddle
from paddle.v2.plot import Ploter

step = 0

class PaddleUtil:
    def get_parameters(self, parameters_path=None, cost=None):
        if not parameters_path:
            if not cost:
                raise NameError('Please input cost parameter')
            parameters = paddle.parameters.create(cost)
            return parameters
        else:
            try:
                with open(parameters_path, 'r') as f:
                    parameters = paddle.parameters.Parameters.from_tar(f)
                return parameters
            except Exception as e:
                raise NameError("Error in parameter file: %s" % e)

    def get_trainer(self, datadim, type_size, parameters_path, batch_size):
        label = paddle.layer.data(name="label", type=paddle.data_type.integer_value(type_size))
        image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
        fea, out = resnet(image, class_dim=type_size)
        cost = paddle.layer.classification_cost(input=out, label=label)

        if not parameters_path:
            parameters = self.get_parameters(cost=cost)
        else:
            parameters = self.get_parameters(parameters_path=parameters_path)

        optimizer = paddle.optimizer.Momentum(
            momentum=0.9,
            regularization=paddle.optimizer.L2Regularization(rate=0.0005 * batch_size),
            learning_rate=0.00001 / batch_size,
            learning_rate_decay_a=0.1,
            learning_rate_decay_b=128000 * 35,
            learning_rate_schedule="discexp", )

        trainer = paddle.trainer.SGD(cost=cost,
                                     parameters=parameters,
                                     update_equation=optimizer)
        return trainer

    def start_trainer(self, trainer, num_passes, save_parameters_name, trainer_reader, batch_size):
        reader = paddle.batch(reader=paddle.reader.shuffle(reader=trainer_reader,
                                                           buf_size=5000),
                              batch_size=batch_size)
        father_path = save_parameters_name[:save_parameters_name.rfind("/")]
        if not os.path.exists(father_path):
            os.makedirs(father_path)

        feeding = {"image": 0, "label": 1}

        train_title = "Train cost"
        error_title = "Error"
        cost_ploter = Ploter(train_title, error_title)

        def event_handler_plot(event):
            global step
            if isinstance(event, paddle.event.EndIteration):
                if step % 1 == 0:
                    cost_ploter.append(train_title, step, event.cost)
                    cost_ploter.plot()
                step += 1
                if step % 100 == 0:
                    with open(save_parameters_name, 'w') as f:
                        trainer.save_parameter_to_tar(f)

        trainer.train(reader=reader,
                      num_passes=num_passes,
                      event_handler=event_handler_plot,
                      feeding=feeding)

if __name__ == '__main__':
    paddle.init(use_gpu=True, trainer_count=1)
    type_size = 10575
    imageSize = 128
    crop_size = 128
    batch_size = 256
    parameters_path = "/home/test/model.tar"
    datadim = imageSize * imageSize
    paddleUtil = PaddleUtil()

    myReader = MyReader(imageSize=imageSize, type_size=type_size, center_crop_size=crop_size)
    trainer_reader = myReader.train_reader(train_list="/home/test/train_set/trainer.list")
    trainer = paddleUtil.get_trainer(datadim=datadim, type_size=type_size, parameters_path=None, batch_size=batch_size)

    paddleUtil.start_trainer(trainer=trainer, num_passes=50, save_parameters_name=parameters_path,
                             trainer_reader=trainer_reader, batch_size=batch_size)

Prediction

After training, we use the trained model for face comparison and recognition.

Face Comparison

Face comparison is essentially a classification prediction, but instead of using the final fully connected layer, we use the output of the last pooling layer to extract face features. The cosine similarity is then used to calculate the similarity between two faces.

  • Applications: Verify if the face on an ID document matches the real person. Set a threshold (recommended 0.8) to determine if the similarity score meets the threshold.
  • Face recognition implementation:
    1. Register faces into a database with associated information.
    2. For recognition, compare the test face with all registered faces. If a match is found, recognition is successful.
    3. Benefit: No need to collect large amounts of face data for new users; just 1-2 images from different angles suffice.
import numpy as np
import paddle.v2 as paddle
import os
import cv2
import math
from sklearn import preprocessing

def get_parameters(parameters_path):
    with open(parameters_path, 'r') as f:
        parameters = paddle.parameters.Parameters.from_tar(f)
    return parameters

def get_inference(parameters, fea):
    inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
    return inferer

def load_image(file, imageSize):
    img = cv2.imread(file, 0)
    img = np.reshape(img, [img.shape[0], img.shape[1], 1])
    img = paddle.image.center_crop(img, 128, is_color=False)
    img = cv2.resize(img, (imageSize, imageSize)).flatten()
    return img

def to_prediction(inferer, image_paths, imageSize):
    test_data = [load_image(path, imageSize) for path in image_paths]
    probs = inferer.infer(input=test_data)
    prob1, prob2 = probs[0], probs[1]
    dist = np.dot(prob1, prob2) / (np.linalg.norm(prob1) * np.linalg.norm(prob2))
    return dist

if __name__ == '__main__':
    paddle.init(use_gpu=True, trainer_count=1)
    type_size = 10575
    imageSize = 128
    parameters_path = "/home/test/model.tar"
    datadim = imageSize * imageSize
    parameters = get_parameters(parameters_path)
    image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
    fea, out = resnet(image, class_dim=type_size)
    inferer = get_inference(parameters, fea)

    image_paths = ["/home/test/0.jpg", "/home/test/1.jpg"]
    result = to_prediction(inferer, image_paths, imageSize)
    print("Similarity between images: " + str(result))

Face Recognition

This method is not recommended for its classification-based approach. It directly outputs the label and probability for a single face image. To add new users, a large amount of face data is needed, requiring retraining. It has low scalability but fast recognition speed.
```python
import cv2
import numpy as np
import paddle.v2 as paddle

def get_parameters(parameters_path):
with open(parameters_path, ‘r’) as f:
parameters = paddle.parameters.Parameters.from_tar(f)
return parameters

def get_inference(parameters, fea):
inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
return inferer

def load_image(file, imageSize):
img = cv2.imread(file, 0)
img = np.reshape(img, [img.shape[0], img.shape[1], 1])
img = paddle.image.center_crop(img, 128, is_color=False)
img = cv2.resize(img, (imageSize, imageSize)).flatten()
return img

def to_prediction(inferer, image_path, imageSize):
test_data = [load_image(image_path, imageSize)]
probs = inferer.infer(input=test_data)
lab = np.argsort(-probs)
return lab[0][0], probs[0][lab[0][0]]

if name == ‘main’:

Xiaoye