Introduction¶
In this article, we will learn how to implement face comparison and face recognition using PaddlePaddle, with the CASIA-WebFace dataset as the training data.
Dataset Introduction¶
We use the CASIA-WebFace dataset, which is currently the largest publicly available face dataset. It contains 10,575 individuals and 494,414 images, including both color and grayscale images. The comparison of major face datasets is shown in the table below:
| Dataset | Subjects | Images | Availability |
|---|---|---|---|
| LFW [1] | 5,749 | 13,233 | Public |
| WDRef [2] | 2,995 | 99,773 | Public (feature only) |
| CelebFaces [3] | 10,177 | 202,599 | Private |
| SFC [4] | 4,030 | 4,400,000 | Private |
| CACD [5] | 2,000 | 163,446 | Public (partial annotated) |
| CASIA-WebFace | 10,575 | 494,414 | Public |
Training Model¶
To facilitate dataset reading, we need to generate an image list for training data reading. For details on the role of this list, please refer to the author’s previous article: “My PaddlePaddle Learning Notes IV - Custom Image Dataset Recognition”. Execute the following code to generate the image list for the face dataset. After downloading and extracting the CASIA-WebFace dataset, run the code with the extracted root directory. It will generate an image list folder in /home/test.
# Code to generate image list
import os
import json
class CreateDataList:
def __init__(self):
pass
def createTrainDataList(self, data_root_path):
data_list_path = ''
class_detail = []
class_dirs = os.listdir(data_root_path)
class_label = 0
father_paths = data_root_path.split('/')
while True:
if father_paths[father_paths.__len__() - 1] == '':
del father_paths[father_paths.__len__() - 1]
else:
break
father_path = father_paths[father_paths.__len__() - 1]
all_class_images = 0
for class_dir in class_dirs:
class_detail_list = {}
test_sum = 0
trainer_sum = 0
data_list_path = "/home/test/%s/" % father_path
class_sum = 0
path = data_root_path + "/" + class_dir
img_paths = os.listdir(path)
for img_path in img_paths:
name_path = path + '/' + img_path
isexist = os.path.exists(data_list_path)
if not isexist:
os.makedirs(data_list_path)
trainer_sum += 1
with open(data_list_path + "trainer.list", 'a') as f:
f.write(name_path + "\t%d" % class_label + "\n")
class_sum += 1
all_class_images += 1
class_label += 1
class_detail_list['class_name'] = class_dir
class_detail_list['class_label'] = class_label
class_detail_list['class_test_images'] = test_sum
class_detail_list['class_trainer_images'] = trainer_sum
class_detail.append(class_detail_list)
all_class_sum = class_dirs.__len__()
readjson = {}
readjson['all_class_name'] = father_path
readjson['all_class_sum'] = all_class_sum
readjson['all_class_images'] = all_class_images
readjson['class_detail'] = class_detail
jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
with open(data_list_path + "readme.json",'w') as f:
f.write(jsons)
if __name__ == '__main__':
createDataList = CreateDataList()
createDataList.createTrainDataList('/home/test/WebFace/')
Write a reader to read images. This reader performs center cropping on images, which removes background interference since faces are centered:
# Reader for reading images and labels
# coding=utf-8
import cv2
import numpy as np
import paddle.v2 as paddle
import random
from multiprocessing import cpu_count
class MyReader:
def __init__(self, imageSize, type_size, center_crop_size = 128):
self.imageSize = imageSize
self.type_size = type_size
self.center_crop_size = center_crop_size
self.default_image_size = 250
def train_mapper(self, sample):
img, label = sample
sparse_label = [0 for i in range(self.type_size)]
sparse_label[label - 1] = 1
def crop_img(img, center_crop_size):
img = cv2.imread(img, 0)
if center_crop_size < self.default_image_size:
side = (self.default_image_size - center_crop_size) / 2
img = img[side: self.default_image_size - side - 1, side: self.default_image_size - side - 1]
return img
img = crop_img(img, self.center_crop_size)
img = cv2.resize(img, (self.imageSize, self.imageSize))
return img.flatten().astype('float32'), label, sparse_label
def train_reader(self, train_list, buffered_size=1024):
def reader():
with open(train_list, 'r') as f:
lines = [line.strip() for line in f]
random.shuffle(lines)
for line in lines:
line = line.strip().split('\t')
img_path = line[0]
img_label = line[1]
yield img_path, int(img_label)
return paddle.reader.xmap_readers(self.train_mapper, reader, cpu_count(), buffered_size)
Define a convolutional neural network modified from ResNet. It uses 6 convolutional blocks, with the output of the last pooling layer for extracting face features during prediction (for face comparison):
import numpy as np
import paddle.v2 as paddle
def conv_bn_layer(input,
ch_out,
filter_size,
stride,
padding,
active_type=paddle.activation.Relu(),
ch_in=None):
tmp = paddle.layer.img_conv(
input=input,
filter_size=filter_size,
num_channels=ch_in,
num_filters=ch_out,
stride=stride,
padding=padding,
act=paddle.activation.Linear(),
bias_attr=False)
return paddle.layer.batch_norm(input=tmp, act=active_type, moving_average_fraction=0.999)
def shortcut(ipt, ch_in, ch_out, stride):
if ch_in != ch_out:
return conv_bn_layer(ipt, ch_out, 1, stride, 0, paddle.activation.Linear())
else:
return ipt
def basicblock(ipt, ch_in, ch_out, stride):
tmp = conv_bn_layer(ipt, ch_out, 3, stride, 1)
tmp = conv_bn_layer(tmp, ch_out, 3, 1, 1, paddle.activation.Linear())
short = shortcut(ipt, ch_in, ch_out, stride)
return paddle.layer.addto(input=[tmp, short], act=paddle.activation.Relu())
def layer_warp(block_func, ipt, ch_in, ch_out, count, stride):
tmp = block_func(ipt, ch_in, ch_out, stride)
for i in range(1, count):
tmp = block_func(tmp, ch_out, ch_out, 1)
return tmp
def resnet(ipt, class_dim):
n = 1
feature_maps = 512
ipt_bn = ipt - 128.0
conv1 = conv_bn_layer(ipt_bn, ch_in=1, ch_out=8, filter_size=3, stride=1, padding=1)
res0 = layer_warp(basicblock, conv1, 8, 16, n, 1)
res1 = layer_warp(basicblock, res0, 16, 32, n, 1)
res2 = layer_warp(basicblock, res1, 32, 64, n, 2)
res3 = layer_warp(basicblock, res2, 64, 128, n, 2)
res4 = layer_warp(basicblock, res3, 128, 256, n, 2)
res5 = layer_warp(basicblock, res4, 256, feature_maps, n, 2)
pool = paddle.layer.img_pool(input=res5, name='pool', pool_size=8, stride=1, pool_type=paddle.pooling.Avg())
fc = paddle.layer.fc(input=pool, size=class_dim, act=paddle.activation.Softmax())
return pool, fc
Start training the model:
# Training code
import os
import sys
import paddle.v2 as paddle
from paddle.v2.plot import Ploter
step = 0
class PaddleUtil:
def get_parameters(self, parameters_path=None, cost=None):
if not parameters_path:
if not cost:
raise NameError('Please input cost parameter')
parameters = paddle.parameters.create(cost)
return parameters
else:
try:
with open(parameters_path, 'r') as f:
parameters = paddle.parameters.Parameters.from_tar(f)
return parameters
except Exception as e:
raise NameError("Error in parameter file: %s" % e)
def get_trainer(self, datadim, type_size, parameters_path, batch_size):
label = paddle.layer.data(name="label", type=paddle.data_type.integer_value(type_size))
image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
fea, out = resnet(image, class_dim=type_size)
cost = paddle.layer.classification_cost(input=out, label=label)
if not parameters_path:
parameters = self.get_parameters(cost=cost)
else:
parameters = self.get_parameters(parameters_path=parameters_path)
optimizer = paddle.optimizer.Momentum(
momentum=0.9,
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * batch_size),
learning_rate=0.00001 / batch_size,
learning_rate_decay_a=0.1,
learning_rate_decay_b=128000 * 35,
learning_rate_schedule="discexp", )
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer)
return trainer
def start_trainer(self, trainer, num_passes, save_parameters_name, trainer_reader, batch_size):
reader = paddle.batch(reader=paddle.reader.shuffle(reader=trainer_reader,
buf_size=5000),
batch_size=batch_size)
father_path = save_parameters_name[:save_parameters_name.rfind("/")]
if not os.path.exists(father_path):
os.makedirs(father_path)
feeding = {"image": 0, "label": 1}
train_title = "Train cost"
error_title = "Error"
cost_ploter = Ploter(train_title, error_title)
def event_handler_plot(event):
global step
if isinstance(event, paddle.event.EndIteration):
if step % 1 == 0:
cost_ploter.append(train_title, step, event.cost)
cost_ploter.plot()
step += 1
if step % 100 == 0:
with open(save_parameters_name, 'w') as f:
trainer.save_parameter_to_tar(f)
trainer.train(reader=reader,
num_passes=num_passes,
event_handler=event_handler_plot,
feeding=feeding)
if __name__ == '__main__':
paddle.init(use_gpu=True, trainer_count=1)
type_size = 10575
imageSize = 128
crop_size = 128
batch_size = 256
parameters_path = "/home/test/model.tar"
datadim = imageSize * imageSize
paddleUtil = PaddleUtil()
myReader = MyReader(imageSize=imageSize, type_size=type_size, center_crop_size=crop_size)
trainer_reader = myReader.train_reader(train_list="/home/test/train_set/trainer.list")
trainer = paddleUtil.get_trainer(datadim=datadim, type_size=type_size, parameters_path=None, batch_size=batch_size)
paddleUtil.start_trainer(trainer=trainer, num_passes=50, save_parameters_name=parameters_path,
trainer_reader=trainer_reader, batch_size=batch_size)
Prediction¶
After training, we use the trained model for face comparison and recognition.
Face Comparison¶
Face comparison is essentially a classification prediction, but instead of using the final fully connected layer, we use the output of the last pooling layer to extract face features. The cosine similarity is then used to calculate the similarity between two faces.
- Applications: Verify if the face on an ID document matches the real person. Set a threshold (recommended 0.8) to determine if the similarity score meets the threshold.
- Face recognition implementation:
1. Register faces into a database with associated information.
2. For recognition, compare the test face with all registered faces. If a match is found, recognition is successful.
3. Benefit: No need to collect large amounts of face data for new users; just 1-2 images from different angles suffice.
import numpy as np
import paddle.v2 as paddle
import os
import cv2
import math
from sklearn import preprocessing
def get_parameters(parameters_path):
with open(parameters_path, 'r') as f:
parameters = paddle.parameters.Parameters.from_tar(f)
return parameters
def get_inference(parameters, fea):
inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
return inferer
def load_image(file, imageSize):
img = cv2.imread(file, 0)
img = np.reshape(img, [img.shape[0], img.shape[1], 1])
img = paddle.image.center_crop(img, 128, is_color=False)
img = cv2.resize(img, (imageSize, imageSize)).flatten()
return img
def to_prediction(inferer, image_paths, imageSize):
test_data = [load_image(path, imageSize) for path in image_paths]
probs = inferer.infer(input=test_data)
prob1, prob2 = probs[0], probs[1]
dist = np.dot(prob1, prob2) / (np.linalg.norm(prob1) * np.linalg.norm(prob2))
return dist
if __name__ == '__main__':
paddle.init(use_gpu=True, trainer_count=1)
type_size = 10575
imageSize = 128
parameters_path = "/home/test/model.tar"
datadim = imageSize * imageSize
parameters = get_parameters(parameters_path)
image = paddle.layer.data(name="image", type=paddle.data_type.dense_vector(datadim))
fea, out = resnet(image, class_dim=type_size)
inferer = get_inference(parameters, fea)
image_paths = ["/home/test/0.jpg", "/home/test/1.jpg"]
result = to_prediction(inferer, image_paths, imageSize)
print("Similarity between images: " + str(result))
Face Recognition¶
This method is not recommended for its classification-based approach. It directly outputs the label and probability for a single face image. To add new users, a large amount of face data is needed, requiring retraining. It has low scalability but fast recognition speed.
```python
import cv2
import numpy as np
import paddle.v2 as paddle
def get_parameters(parameters_path):
with open(parameters_path, ‘r’) as f:
parameters = paddle.parameters.Parameters.from_tar(f)
return parameters
def get_inference(parameters, fea):
inferer = paddle.inference.Inference(output_layer=fea, parameters=parameters)
return inferer
def load_image(file, imageSize):
img = cv2.imread(file, 0)
img = np.reshape(img, [img.shape[0], img.shape[1], 1])
img = paddle.image.center_crop(img, 128, is_color=False)
img = cv2.resize(img, (imageSize, imageSize)).flatten()
return img
def to_prediction(inferer, image_path, imageSize):
test_data = [load_image(image_path, imageSize)]
probs = inferer.infer(input=test_data)
lab = np.argsort(-probs)
return lab[0][0], probs[0][lab[0][0]]
if name == ‘main’: