驗證碼端到端的識別,是對《我的PaddlePaddle學習之路》筆記六——驗證碼端到端的識別 的升級,這篇文章是我18年初寫的,基於當時的V2版本編寫,現在有點過時了,突然想升級一下。
在線運行¶
在線運行:https://aistudio.baidu.com/aistudio/projectdetail/1679868
創建數據列表和詞彙表¶
數據列表是爲了方便訓練是讀取數據的。
import os
import cv2
def createDataList(data_path, list_path):
# 讀取所有的圖像路徑
imgs = os.listdir(data_path)
with open(list_path, 'w', encoding='utf-8') as f:
for img in imgs:
name = img.split('.')[0]
image_path = os.path.join(data_path, img)
# 寫入圖像路徑和label,用Tab隔開
f.write(image_path + '\t' + name + '\n')
createDataList('dataset/train_data/', 'dataset/train.txt')
createDataList('dataset/test_data/', 'dataset/test.txt')
還缺詞彙表,執行下面的代碼。
with open('dataset/train.txt', 'r', encoding='utf-8') as f:
lines = f.readlines()
v = set()
for line in lines:
_, label = line.replace('\n', '').split('\t')
for c in label:
v.add(c)
vocabulary_path = 'dataset/vocabulary.txt'
with open(vocabulary_path, 'w', encoding='utf-8') as f:
for c in v:
f.write(c + '\n')
解碼器¶
這個好似貪心解碼方法,用於解碼預測的輸出的結果,將PaddlePaddle輸出的結果轉換爲字符串。這裏還提供了數據標籤轉字符串的,和計算字錯率的。
%%writefile decoder.py
import Levenshtein as Lev
from itertools import groupby
import paddle
def ctc_greedy_decoder(probs_seq, vocabulary):
"""CTC貪婪(最佳路徑)解碼器。
由最可能的令牌組成的路徑被進一步後處理
刪除連續的重複和所有的空白。
:param probs_seq: 每個詞彙表上概率的二維列表字符。
每個元素都是浮點概率列表爲一個字符。
:type probs_seq: list
:param vocabulary: 詞彙表
:type vocabulary: list
:return: 解碼結果字符串
:rtype: baseline
"""
# 尺寸驗證
for probs in probs_seq:
if not len(probs) == len(vocabulary) + 1:
raise ValueError("probs_seq 尺寸與詞彙不匹配")
# argmax以獲得每個時間步長的最佳指標
max_index_list = paddle.argmax(probs_seq, -1).numpy()
# 刪除連續的重複索引
index_list = [index_group[0] for index_group in groupby(max_index_list)]
# 刪除空白索引
blank_index = len(vocabulary)
index_list = [index for index in index_list if index != blank_index]
# 將索引列表轉換爲字符串
return ''.join([vocabulary[index] for index in index_list])[:4]
def label_to_string(label, vocabulary):
"""標籤轉文字
:param label: 結果的標籤,或者數據集的標籤
:type label: list
:param vocabulary: 詞彙表
:type vocabulary: list
:return: 解碼結果字符串
:rtype: baseline
"""
return ''.join([vocabulary[index] for index in label])
def cer(out_string, target_string):
"""通過計算兩個字符串的距離,得出字錯率
Arguments:
out_string (string): 比較的字符串
target_string (string): 比較的字符串
"""
s1, s2, = out_string.replace(" ", ""), target_string.replace(" ", "")
return Lev.distance(s1, s2)
數據讀取器¶
這個是用於訓練時讀取數據的,用數據列表中讀取圖片和標籤,將圖片進行預處理,字符串標籤轉換爲整型的標籤輸入的網絡模型中。
%%writefile data.py
import cv2
import numpy as np
from paddle.io import Dataset
# 圖像預處理
def process(path):
image = cv2.imread(path)
# 轉灰度圖
image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# 統一縮放大小
image = cv2.resize(image, (72, 27))
# 轉換成CHW
image = image[np.newaxis, :]
# 歸一化
image = (image - 128) / 128
return image
# 數據加載器
class CustomDataset(Dataset):
def __init__(self, data_list_path, voc_path):
super(CustomDataset, self).__init__()
with open(data_list_path, 'r', encoding='utf-8') as f:
self.lines = f.readlines()
with open(voc_path, 'r', encoding='utf-8') as f:
labels = f.readlines()
self.vocabulary = [labels[i].replace('\n', '') for i in range(len(labels))]
self.vocabulary_dict = dict([(labels[i].replace('\n', ''), i) for i in range(len(labels))])
def __getitem__(self, idx):
path, label = self.lines[idx].replace('\n', '').split('\t')
img = process(path)
# 將字符標籤轉換爲int數據
transcript = [self.vocabulary_dict.get(x) for x in label]
img = np.array(img, dtype='float32')
transcript = np.array(transcript, dtype='int32')
return img, transcript
def __len__(self):
return len(self.lines)
模型結構¶
這個模型類型CRNN,前面使用卷積層提前圖像特徵,後面用一個GRU,他是LSTM的變種,最後的全連接層,輸出的大小爲詞彙表+1,因爲還有一個空格字符,這個是CTC需要的。
%%writefile model.py
import paddle
import paddle.nn as nn
class Model(nn.Layer):
def __init__(self, vocabulary):
super(Model, self).__init__()
self.conv1 = nn.Conv2D(in_channels=1, out_channels=32, kernel_size=3)
self.relu1 = nn.ReLU()
self.bn1 = nn.BatchNorm2D(32)
self.pool1 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv2 = nn.Conv2D(in_channels=32, out_channels=64, kernel_size=3)
self.relu2 = nn.ReLU()
self.bn2 = nn.BatchNorm2D(64)
self.pool2 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv3 = nn.Conv2D(in_channels=64, out_channels=128, kernel_size=3)
self.relu3 = nn.ReLU()
self.bn3 = nn.BatchNorm2D(128)
self.pool3 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv4 = nn.Conv2D(in_channels=128, out_channels=256, kernel_size=3)
self.relu4 = nn.ReLU()
self.bn4 = nn.BatchNorm2D(256)
self.pool4 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv5 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
self.relu5 = nn.ReLU()
self.bn5 = nn.BatchNorm2D(256)
self.pool5 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv6 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
self.relu6 = nn.ReLU()
self.bn6 = nn.BatchNorm2D(256)
self.pool6 = nn.MaxPool2D(kernel_size=2, stride=1)
self.conv7 = nn.Conv2D(in_channels=256, out_channels=256, kernel_size=3)
self.relu7 = nn.ReLU()
self.bn7 = nn.BatchNorm2D(256)
self.pool7 = nn.MaxPool2D(kernel_size=2, stride=1)
self.fc = nn.Linear(in_features=306, out_features=128)
self.gru = nn.GRU(input_size=256, hidden_size=128)
self.output = nn.Linear(in_features=128, out_features=len(vocabulary) + 1)
def forward(self, x):
x = self.relu1(self.bn1(self.conv1(x)))
x = self.pool1(x)
x = self.relu2(self.bn2(self.conv2(x)))
x = self.pool2(x)
x = self.relu3(self.bn3(self.conv3(x)))
x = self.pool3(x)
x = self.relu4(self.bn4(self.conv4(x)))
x = self.pool4(x)
x = self.relu5(self.bn5(self.conv5(x)))
x = self.pool5(x)
x = self.relu6(self.bn6(self.conv6(x)))
x = self.pool6(x)
x = self.relu7(self.bn7(self.conv7(x)))
x = self.pool7(x)
x = paddle.reshape(x, shape=(x.shape[0], x.shape[1], -1))
x = self.fc(x)
x = paddle.transpose(x, perm=[0, 2, 1])
y, h = self.gru(x)
x = self.output(y)
return x
訓練¶
這就開始訓練了。三兩下就可以訓練完,主要是數據量少。每十輪訓練結束都執行一次評估,輸出的是字錯率。保存的模型是靜態模型,方便預測。
import paddle
import numpy as np
import os
from datetime import datetime
from model import Model
from decoder import ctc_greedy_decoder, label_to_string, cer
from paddle.io import DataLoader
from data import CustomDataset
from visualdl import LogWriter
from paddle.static import InputSpec
train_data_list_path = 'dataset/train.txt'
test_data_list_path = 'dataset/test.txt'
voc_path = 'dataset/vocabulary.txt'
save_model = 'models/model'
batch_size = 32
pretrained_model = None
num_epoch = 100
learning_rate = 1e-3
writer = LogWriter(logdir='log')
def train():
# 獲取訓練數據
train_dataset = CustomDataset(train_data_list_path, voc_path)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
# 獲取測試數據
test_dataset = CustomDataset(test_data_list_path, voc_path)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)
# 獲取模型
model = Model(train_dataset.vocabulary)
paddle.summary(model, input_size=(batch_size, 1, 27, 72))
# 設置優化方法
boundaries = [10, 20, 50]
lr = [0.1 ** l * learning_rate for l in range(len(boundaries) + 1)]
scheduler = paddle.optimizer.lr.PiecewiseDecay(boundaries=boundaries, values=lr, verbose=False)
optimizer = paddle.optimizer.Adam(parameters=model.parameters(), learning_rate=scheduler)
# 獲取損失函數
ctc_loss = paddle.nn.CTCLoss(blank=len(train_dataset.vocabulary))
# 加載預訓練模型
if pretrained_model is not None:
model.set_state_dict(paddle.load(os.path.join(pretrained_model, 'model.pdparams')))
optimizer.set_state_dict(paddle.load(os.path.join(pretrained_model, 'optimizer.pdopt')))
train_step = 0
test_step = 0
# 開始訓練
for epoch in range(num_epoch):
for batch_id, (inputs, labels) in enumerate(train_loader()):
out = model(inputs)
out = paddle.transpose(out, perm=[1, 0, 2])
input_lengths = paddle.full(shape=[out.shape[1]], fill_value=out.shape[0], dtype="int64")
label_lengths = paddle.full(shape=[out.shape[1]], fill_value=4, dtype="int64")
# 計算損失
loss = ctc_loss(out, labels, input_lengths, label_lengths)
loss.backward()
optimizer.step()
optimizer.clear_grad()
# 多卡訓練只使用一個進程打印
if batch_id % 100 == 0:
print('[%s] Train epoch %d, batch %d, loss: %f' % (datetime.now(), epoch, batch_id, loss))
writer.add_scalar('Train loss', loss, train_step)
train_step += 1
if (epoch % 10 == 0 and epoch != 0) or epoch == num_epoch:
# 執行評估
model.eval()
cer = evaluate(model, test_loader, train_dataset.vocabulary)
print('[%s] Test epoch %d, cer: %f' % (datetime.now(), epoch, cer))
writer.add_scalar('Test cer', cer, test_step)
test_step += 1
model.train()
# 記錄學習率
writer.add_scalar('Learning rate', scheduler.last_lr, epoch)
scheduler.step()
# 保存模型
paddle.jit.save(layer=model, path=save_model, input_spec=[InputSpec(shape=[None, 1, 27, 72], dtype='float32')])
# 評估模型
def evaluate(model, test_loader, vocabulary):
cer_result = []
for batch_id, (inputs, labels) in enumerate(test_loader()):
# 執行識別
outs = model(inputs)
outs = paddle.nn.functional.softmax(outs)
# 解碼獲取識別結果
labelss = []
out_strings = []
for out in outs:
out_string = ctc_greedy_decoder(out, vocabulary)
out_strings.append(out_string)
for label in labels:
labels = label_to_string(label, vocabulary)
labelss.append(labels)
for out_string, label in zip(*(out_strings, labelss)):
print(label, out_string)
# 計算字錯率
c = cer(out_string, label) / float(len(label))
cer_result.append(c)
cer_result = float(np.mean(cer_result))
return cer_result
train()
預測¶
使用訓練好的模型識別驗證碼圖片。
import numpy as np
import paddle
from data import process
from decoder import ctc_greedy_decoder
with open('dataset/vocabulary.txt', 'r', encoding='utf-8') as f:
vocabulary = f.readlines()
vocabulary = [v.replace('\n', '') for v in vocabulary]
save_model = 'models/model'
model = paddle.jit.load(save_model)
model.eval()
def infer(path):
data = process(path)
data = data[np.newaxis, :]
data = paddle.to_tensor(data, dtype='float32')
# 執行識別
out = model(data)
out = paddle.nn.functional.softmax(out)[0]
# 解碼獲取識別結果
out_string = ctc_greedy_decoder(out, vocabulary)
print('預測結果:%s' % out_string)
if __name__ == '__main__':
image_path = 'dataset/test.png'
infer(image_path)