End-to-End Recognition of Captchas
This is an upgrade to the note “My PaddlePaddle Learning Journey - End-to-End Recognition of Captchas” (https://yeyupiaoling.blog.csdn.net/article/details/79233565), which I wrote in early 2018 based on the V2 version. It’s now somewhat outdated, and I suddenly wanted to upgrade it.

Online Running

Online Run: https://aistudio.baidu.com/aistudio/projectdetail/1679868

Creating Data List and Vocabulary

The data list is for convenient data reading during training.

import os
import cv2

def createDataList(data_path, list_path):
    # Read all image paths
    imgs = os.listdir(data_path)
    with open(list_path, 'w', encoding='utf-8') as f:
        for img in imgs:
            name = img.split('.')[0]
            image_path = os.path.join(data_path, img)
            # Write image path and label, separated by Tab
            f.write(image_path + '\t' + name + '\n')

createDataList('dataset/train_data/', 'dataset/train.txt')
createDataList('dataset/test_data/', 'dataset/test.txt')

We also need a vocabulary. Execute the following code:

with open('dataset/train.txt', 'r', encoding='utf-8') as f:
    lines = f.readlines()
v = set()
for line in lines:
    _, label = line.replace('\n', '').split('\t')
    for c in label:
        v.add(c)

vocabulary_path = 'dataset/vocabulary.txt'
with open(vocabulary_path, 'w', encoding='utf-8') as f:
    for c in v:
        f.write(c + '\n')

Decoder

This is the greedy decoding method, used to decode the prediction output results and convert the output of PaddlePaddle into a string. Here, we also provide functions to convert data labels to strings and calculate the word error rate.

%%writefile decoder.py

import Levenshtein as Lev
from itertools import groupby
import paddle


def ctc_greedy_decoder(probs_seq, vocabulary):
    """CTC Greedy (Best Path) Decoder.
    The path composed of the most probable tokens is further postprocessed
    to remove consecutive duplicates and all blanks.
    :param probs_seq: 2D list of probabilities for each vocabulary character.
                      Each element is a list of float probabilities for one character.
    :type probs_seq: list
    :param vocabulary: Vocabulary
    :type vocabulary: list
    :return: Decoded result string
    :rtype: baseline
    """
    # Dimension validation
    for probs in probs_seq:
        if not len(probs) == len(vocabulary) + 1:
            raise ValueError("probs_seq dimension does not match vocabulary")
    # Argmax to get the best index at each time step
    max_index_list = paddle.argmax(probs_seq, -1).numpy()
    # Remove consecutive duplicate indices
    index_list = [index_group[0] for index_group in groupby(max_index_list)]
    # Remove blank indices
    blank_index = len(vocabulary)
    index_list = [index for index in index_list if index != blank_index]
    # Convert index list to string
    return ''.join([vocabulary[index] for index in index_list])[:4]


def label_to_string(label, vocabulary):
    """Convert label to text

    :param label: Result label or dataset label
    :type label: list
    :param vocabulary: Vocabulary
    :type vocabulary: list
    :return: Decoded result string
    :rtype: baseline
    """
    return ''.join([vocabulary[index] for index in label])


def cer(out_string, target_string):
    """Calculate the word error rate by computing the distance between two strings

    Arguments:
        out_string (string): String to compare
        target_string (string): String to compare
    """
    s1, s2 = out_string.replace(" ", ""), target_string.replace(" ", "")
    return Lev.distance(s1, s2)

Data Reader

This is used to read data during training, reading images and labels from the data list, preprocessing images, and converting string labels to integer labels for input into the network model.

%%writefile data.py

import cv2
import numpy as np
from paddle.io import Dataset


# Image preprocessing
def process(path):
    image = cv2.imread(path)
    # Convert to grayscale
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    # Resize to uniform size
    image = cv2.resize(image, (72, 27))
    # Convert to CHW
    image = image[np.newaxis, :]
    # Normalize
    image = (image - 128) / 128
    return image


# Data loader
class CustomDataset(Dataset):
    def __init__(self, data_list_path, voc_path):
        super(CustomDataset, self).__init__()
        with open(data_list_path, 'r', encoding='utf-8') as f:
            self.lines = f.readlines()
        with open(voc_path, 'r', encoding='utf-8') as f:
            labels = f.readlines()
        self.vocabulary = [labels[i].replace('\n', '') for i in range(len(labels))]
        self.vocabulary_dict = dict([(labels[i].replace('\n', ''), i) for i in range(len(labels))])

    def __getitem__(self, idx):
        path, label = self.lines[idx].replace('\n', '').split('\t')
        img = process(path)
        # Convert character labels to integer data
        transcript = [self.vocabulary_dict.get(x) for x in label]
        img = np.array(img, dtype='float32')
        transcript = np.array(transcript, dtype='int32')
        return img, transcript

    def __len__(self):
        return len(self.lines)

Model Structure

This model is of type CRNN. It uses convolutional layers to extract image features at the front, followed by a GRU (a variant of LSTM), and finally a fully connected layer. The output size is vocabulary size + 1, as there is an additional blank character required by CTC.

%%writefile model.py

import paddle
import paddle.nn as nn


class Model(nn.Layer):
    def __init__(self, vocabulary):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2D(in_channels=1, out_channels=32, kernel_size=3)
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2D(32)
        self.pool1 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv2 = nn.Conv2D(in_channels=32, out_channels=64, kernel_size=3)
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2D(64)
        self.pool2 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv3 = nn.Conv2D(in_channels=64, out_channels=128, kernel_size=3)
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2D(128)
        self.pool3 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv4 = nn.Conv2D(in_channels=128, out_channels=256, kernel_size=3)
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2D(256)
        self.pool4 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv5 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu5 = nn.ReLU()
        self.bn5 = nn.BatchNorm2D(256)
        self.pool5 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv6 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu6 = nn.ReLU()
        self.bn6 = nn.BatchNorm2D(256)
        self.pool6 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.conv7 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
        self.relu7 = nn.ReLU()
        self.bn7 = nn.BatchNorm2D(256)
        self.pool7 = nn.MaxPool2D(kernel_size=2, stride=1)

        self.fc = nn.Linear(in_features=306, out_features=128)

        self.gru = nn.GRU(input_size=256, hidden_size=128)

        self.output = nn.Linear(in_features=128, out_features=len(vocabulary) + 1)

    def forward(self, x):
        x = self.relu1(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = self.relu2(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = self.relu3(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        x = self.relu4(self.bn4(self.conv4(x)))
        x = self.pool4(x)
        x = self.relu5(self.bn5(self.conv5(x)))
        x = self.pool5(x)
        x = self.relu6(self.bn6(self.conv6(x)))
        x = self.pool6(x)
        x = self.relu7(self.bn7(self.conv7(x)))
        x = self.pool7(x)
        x = paddle.reshape(x, shape=(x.shape[0], x.shape[1], -1))
        x = self.fc(x)
        x = paddle.transpose(x, perm=[0, 2, 1])
        y, h = self.gru(x)
        x = self.output(y)
        return x

Training

Now we start training. It can be trained quickly with a small amount of data. After every 10 training epochs, an evaluation is performed to output the word error rate. The saved model is a static model for easy prediction.

import paddle
import numpy as np
import os
from datetime import datetime
from model import Model
from decoder import ctc_greedy_decoder, label_to_string, cer
from paddle.io import DataLoader
from data import CustomDataset
from visualdl import LogWriter
from paddle.static import InputSpec

train_data_list_path = 'dataset/train.txt'
test_data_list_path = 'dataset/test.txt'
voc_path = 'dataset/vocabulary.txt'
save_model = 'models/model'
batch_size = 32
pretrained_model = None
num_epoch = 100
learning_rate = 1e-3
writer = LogWriter(logdir='log')


def train():
    # Get training data
    train_dataset = CustomDataset(train_data_list_path, voc_path)
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
    # Get test data
    test_dataset = CustomDataset(test_data_list_path, voc_path)
    test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)
    # Get model
    model = Model(train_dataset.vocabulary)
    paddle.summary(model, input_size=(batch_size, 1, 27, 72))
    # Set optimizer
    boundaries = [10, 20, 50]
    lr = [0.1 ** l * learning_rate for l in range(len(boundaries) + 1)]
    scheduler = paddle.optimizer.lr.PiecewiseDecay(boundaries=boundaries, values=lr, verbose=False)
    optimizer = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=scheduler)
    # Get loss function
    ctc_loss = paddle.nn.CTCLoss(blank=len(train_dataset.vocabulary))
    # Load pretrained model
    if pretrained_model is not None:
        model.set_state_dict(paddle.load(os.path.join(pretrained_model, 'model.pdparams')))
        optimizer.set_state_dict(paddle.load(os.path.join(pretrained_model, 'optimizer.pdopt')))
    train_step = 0
    test_step = 0
    # Start training
    for epoch in range(num_epoch):
        for batch_id, (inputs, labels) in enumerate(train_loader()):
            out = model(inputs)
            out = paddle.transpose(out, perm=[1, 0, 2])
            input_lengths = paddle.full(shape=[out.shape[1]], fill_value=out.shape[0], dtype="int64")
            label_lengths = paddle.full(shape=[out.shape[1]], fill_value=4, dtype="int64")
            # Calculate loss
            loss = ctc_loss(out, labels, input_lengths, label_lengths)
            loss.backward()
            optimizer.step()
            optimizer.clear_grad()
            # Print for multi-card training with one process
            if batch_id % 100 == 0:
                print('[%s] Train epoch %d, batch %d, loss: %f' % (datetime.now(), epoch, batch_id, loss))
                writer.add_scalar('Train loss', loss, train_step)
                train_step += 1
        if (epoch % 10 == 0 and epoch != 0) or epoch == num_epoch:
            # Perform evaluation
            model.eval()
            cer = evaluate(model, test_loader, train_dataset.vocabulary)
            print('[%s] Test epoch %d, cer: %f' % (datetime.now(), epoch, cer))
            writer.add_scalar('Test cer', cer, test_step)
            test_step += 1
            model.train()
        # Record learning rate
        writer.add_scalar('Learning rate', scheduler.last_lr, epoch)
        scheduler.step()
        # Save model
        paddle.jit.save(layer=model, path=save_model, input_spec=[InputSpec(shape=[None, 1, 27, 72], dtype='float32')])


# Evaluate model
def evaluate(model, test_loader, vocabulary):
    cer_result = []
    for batch_id, (inputs, labels) in enumerate(test_loader()):
        # Perform recognition
        outs = model(inputs)
        outs = paddle.nn.functional.softmax(outs)
        # Decode to get recognition results
        out_strings = []
        for out in outs:
            out_string = ctc_greedy_decoder(out, vocabulary)
            out_strings.append(out_string)
        for label in labels:
            label_str = label_to_string(label, vocabulary)
            cer_result.append(cer(out_string, label_str) / float(len(label_str)))
    cer_result = float(np.mean(cer_result))
    return cer_result


train()

Prediction

Use the trained model to recognize captcha images.

import numpy as np
import paddle

from data import process
from decoder import ctc_greedy_decoder


with open('dataset/vocabulary.txt', 'r', encoding='utf-8') as f:
    vocabulary = f.readlines()

vocabulary = [v.replace('\n', '') for v in vocabulary]

save_model = 'models/model'
model = paddle.jit.load(save_model)
model.eval()


def infer(path):
    data = process(path)
    data = data[np.newaxis, :]
    data = paddle.to_tensor(data, dtype='float32')
    # Perform recognition
    out = model(data)
    out = paddle.nn.functional.softmax(out)[0]
    # Decode to get recognition result
    out_string = ctc_greedy_decoder(out, vocabulary)

    print('Prediction Result: %s' % out_string)


if __name__ == '__main__':
    image_path = 'dataset/test.png'
    infer(image_path)
Xiaoye