我的PaddlePaddle学习之路笔记四——自定义图像分类(基于VGG)

前言

在之前的文章中,我们已经学习了如何使用PaddlePaddle实现简单的图像分类模型。这篇文章将介绍如何使用自定义数据集,结合VGG神经网络进行图像分类。我们将详细讲解从数据准备、模型构建到训练和预测的完整流程。

数据准备

1. 数据结构

我们使用的数据集结构如下(以蔬菜分类为例):

vegetables/
├── cuke/
│   ├── 1515826971850.jpg
│   ├── 1515826971851.jpg
│   └── ...
├── lettuce/
│   ├── 1515827012863.jpg
│   ├── 1515827012864.jpg
│   └── ...
└── lotus_root/
    ├── 1515827059200.jpg
    ├── 1515827059201.jpg
    └── ...

每个子文件夹代表一个类别,例如cukelettucelotus_root分别对应黄瓜、生菜、莲藕三类蔬菜。

2. 生成图像列表

我们需要编写一个Python脚本,将每个图像的路径和对应的标签整理成训练列表和测试列表。

# coding=utf-8
import os
import json

class CreateDataList:
    def __init__(self):
        pass

    def createDataList(self, data_root_path):
        # 把生产的数据列表都放在自己的总类别文件夹中
        data_list_path = ''
        # 所有类别的信息
        class_detail = []
        # 获取所有类别
        class_dirs = os.listdir(data_root_path)
        # 类别标签
        class_label = 0
        # 获取总类别的名称
        father_paths = data_root_path.split('/')
        while True:
            if father_paths[father_paths.__len__() - 1] == '':
                del father_paths[father_paths.__len__() - 1]
            else:
                break
        father_path = father_paths[father_paths.__len__() - 1]

        all_class_images = 0
        # 读取每个类别
        for class_dir in class_dirs:
            # 每个类别的信息
            class_detail_list = {}
            test_sum = 0
            trainer_sum = 0
            # 把生产的数据列表都放在自己的总类别文件夹中
            data_list_path = "../data/%s/" % father_path
            # 统计每个类别有多少张图片
            class_sum = 0
            # 获取类别路径
            path = data_root_path + "/" + class_dir
            # 获取所有图片
            img_paths = os.listdir(path)
            for img_path in img_paths:
                # 每张图片的路径
                name_path = path + '/' + img_path
                # 如果不存在这个文件夹,就创建
                isexist = os.path.exists(data_list_path)
                if not isexist:
                    os.makedirs(data_list_path)
                # 每10张图片取一个做测试数据
                if class_sum % 10 == 0:
                    test_sum += 1
                    with open(data_list_path + "test.list", 'a') as f:
                        f.write(name_path + "\t%d" % class_label + "\n")
                else:
                    trainer_sum += 1
                    with open(data_list_path + "trainer.list", 'a') as f:
                        f.write(name_path + "\t%d" % class_label + "\n")
                class_sum += 1
                all_class_images += 1
            class_label += 1
            # 说明的json文件的class_detail数据
            class_detail_list['class_name'] = class_dir
            class_detail_list['class_label'] = class_label
            class_detail_list['class_test_images'] = test_sum
            class_detail_list['class_trainer_images'] = trainer_sum
            class_detail.append(class_detail_list)
        # 获取类别数量
        all_class_sum = class_dirs.__len__()
        # 说明的json文件信息
        readjson = {}
        readjson['all_class_name'] = father_path
        readjson['all_class_sum'] = all_class_sum
        readjson['all_class_images'] = all_class_images
        readjson['class_detail'] = class_detail
        jsons = json.dumps(readjson, sort_keys=True, indent=4, separators=(',', ': '))
        with open(data_list_path + "readme.json",'w') as f:
            f.write(jsons)


if __name__ == '__main__':
    createDataList = CreateDataList()
    createDataList.createDataList('../images/vegetables')

运行该脚本后,会在data文件夹中生成一个单独的大类文件夹(例如vegetables),其中包含三个文件:
- trainer.list:用于训练的图像列表
- test.list:用于测试的图像列表
- readme.json:数据集的说明文件,包含类别数量、每个类别的图像数量等信息

读取数据

接下来,我们需要编写一个数据读取器,用于读取图像列表并生成PaddlePaddle所需的reader。

# coding=utf-8
import os
import json
import paddle.v2 as paddle

class MyReader:
    def __init__(self, imageSize):
        self.imageSize = imageSize

    def train_mapper(self, sample):
        img_path, lab = sample
        img = paddle.image.load_image(img_path)
        img = paddle.image.simple_transform(img, 70, self.imageSize, True)
        return img.flatten().astype('float32'), int(lab)

    def test_mapper(self, sample):
        img_path, lab = sample
        img = paddle.image.load_image(img_path)
        img = paddle.image.simple_transform(img, 70, self.imageSize, False)
        return img.flatten().astype('float32'), int(lab)

    def train_reader(self, train_list):
        def reader():
            with open(train_list, 'r') as f:
                lines = [line.strip() for line in f]
                for line in lines:
                    img_path, lab = line.strip().split('\t')
                    yield img_path, int(lab)
        return paddle.reader.xmap_readers(self.train_mapper, reader, cpu_count(), 1024)

    def test_reader(self, test_list):
        def reader():
            with open(test_list, 'r') as f:
                lines = [line.strip() for line in f]
                for line in lines:
                    img_path, lab = line.strip().split('\t')
                    yield img_path, int(lab)
        return paddle.reader.xmap_readers(self.test_mapper, reader, cpu_count(), 1024)

这里使用了paddle.image.simple_transform函数对图像进行预处理,包括调整大小和随机裁剪(训练时使用True,测试时使用False)。

定义神经网络

我们使用VGG网络来构建图像分类模型。由于数据集较小,我们关闭了Batch Normalization层(conv_with_batchnorm=False),否则可能会因为过拟合导致模型无法收敛。

# coding:utf-8
import paddle.v2 as paddle

def vgg_bn_drop(datadim, type_size):
    # 获取输入数据
    image = paddle.layer.data(name="image",
                              type=paddle.data_type.dense_vector(datadim))

    def conv_block(ipt, num_filter, groups, dropouts, num_channels=None):
        return paddle.networks.img_conv_group(
            input=ipt,
            num_channels=num_channels,
            pool_size=2,
            pool_stride=2,
            conv_num_filter=[num_filter] * groups,
            conv_filter_size=3,
            conv_act=paddle.activation.Relu(),
            conv_with_batchnorm=False,
            conv_batchnorm_drop_rate=dropouts,
            pool_type=paddle.pooling.Max())

    conv1 = conv_block(image, 64, 2, [0.3, 0], 3)
    conv2 = conv_block(conv1, 128, 2, [0.4, 0])
    conv3 = conv_block(conv2, 256, 3, [0.4, 0.4, 0])
    conv4 = conv_block(conv3, 512, 3, [0.4, 0.4, 0])
    conv5 = conv_block(conv4, 512, 3, [0.4, 0.4, 0])

    drop = paddle.layer.dropout(input=conv5, dropout_rate=0.5)
    fc1 = paddle.layer.fc(input=drop, size=512, act=paddle.activation.Linear())
    fc2 = paddle.layer.fc(input=fc1, size=512, act=paddle.activation.Linear())
    out = paddle.layer.fc(input=fc2, size=type_size, act=paddle.activation.Softmax())
    return out

使用PaddlePaddle开始训练

现在,我们编写训练代码,将前面定义的数据读取器和神经网络结合起来,进行模型训练。

# coding:utf-8
import os
import sys
import paddle.v2 as paddle
from MyReader import MyReader
from vgg import vgg_bn_drop


class PaddleUtil:
    def __init__(self):
        # 初始化paddle,使用CPU
        paddle.init(use_gpu=False, trainer_count=2)

    def get_parameters(self, parameters_path=None, cost=None):
        if not parameters_path:
            if not cost:
                raise NameError('请输入cost参数')
            parameters = paddle.parameters.create(cost)
            return parameters
        else:
            try:
                with open(parameters_path, 'r') as f:
                    parameters = paddle.parameters.Parameters.from_tar(f)
                return parameters
            except Exception as e:
                raise NameError("参数文件错误: %s" % e)

    def get_trainer(self, datadim, type_size, parameters_path):
        # 定义标签
        label = paddle.layer.data(name="label",
                                  type=paddle.data_type.integer_value(type_size))

        # 获取VGG网络输出
        out = vgg_bn_drop(datadim=datadim, type_size=type_size)

        # 定义损失函数
        cost = paddle.layer.classification_cost(input=out, label=label)

        # 获取参数
        parameters = self.get_parameters(parameters_path=parameters_path, cost=cost)

        # 定义优化器
        optimizer = paddle.optimizer.Momentum(
            momentum=0.9,
            regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128),
            learning_rate=0.001 / 128,
            learning_rate_decay_a=0.1,
            learning_rate_decay_b=128000 * 35,
            learning_rate_schedule="discexp", )

        # 创建训练器
        trainer = paddle.trainer.SGD(cost=cost,
                                     parameters=parameters,
                                     update_equation=optimizer)
        return trainer

    def start_trainer(self, trainer, num_passes, save_parameters_name, trainer_reader, test_reader):
        # 定义训练数据
        reader = paddle.batch(reader=paddle.reader.shuffle(reader=trainer_reader,
                                                           buf_size=50000),
                              batch_size=128)
        # 确保保存模型的目录存在
        father_path = save_parameters_name[:save_parameters_name.rfind("/")]
        if not os.path.exists(father_path):
            os.makedirs(father_path)

        # 定义数据输入关系
        feeding = {"image": 0, "label": 1}

        # 定义训练事件处理
        def event_handler(event):
            if isinstance(event, paddle.event.EndIteration):
                if event.batch_id % 100 == 0:
                    print "\nPass %d, Batch %d, Cost %f, Error %s" % (
                        event.pass_id, event.batch_id, event.cost, event.metrics['classification_error_evaluator'])
                else:
                    sys.stdout.write('.')
                    sys.stdout.flush()

            if isinstance(event, paddle.event.EndPass):
                # 保存参数
                with open(save_parameters_name, 'w') as f:
                    trainer.save_parameter_to_tar(f)
                # 测试
                result = trainer.test(reader=paddle.batch(reader=test_reader,
                                                          batch_size=128),
                                      feeding=feeding)
                print "\nTest Pass %d, Classification_Error %s" % (event.pass_id, result.metrics['classification_error_evaluator'])

        # 开始训练
        trainer.train(reader=reader,
                      num_passes=num_passes,
                      event_handler=event_handler,
                      feeding=feeding)


if __name__ == '__main__':
    type_size = 3  # 类别数量
    imageSize = 64  # 图像大小
    all_class_name = 'vegetables'  # 类别名称
    parameters_path = "../model/model.tar"  # 模型保存路径
    datadim = 3 * imageSize * imageSize  # 数据维度

    paddleUtil = PaddleUtil()
    myReader = MyReader(imageSize=imageSize)

    # 获取训练器
    trainer = paddleUtil.get_trainer(datadim=datadim, type_size=type_size, parameters_path=None)
    # 获取训练和测试数据
    trainer_reader = myReader.train_reader(train_list="../data/%s/trainer.list" % all_class_name)
    test_reader = myReader.test_reader(test_list="../data/%s/test.list" % all_class_name)

    # 开始训练
    paddleUtil.start_trainer(trainer=trainer, num_passes=100, save_parameters_name=parameters_path,
                             trainer_reader=trainer_reader, test_reader=test_reader)

使用PaddlePaddle预测

训练完成后,我们可以使用训练好的模型对新图像进行预测。

# coding:utf-8
import numpy as np
import paddle.v2 as paddle

from vgg import vgg_bn_drop


def get_parameters(parameters_path):
    with open(parameters_path, 'r') as f:
        parameters = paddle.parameters.Parameters.from_tar(f)
    return parameters


def to_prediction(image_paths, parameters, out, imageSize):
    test_data = []
    for image_path in image_paths:
        img = paddle.image.load_and_transform(image_path, 70, imageSize, False)
        test_data.append((img.flatten().astype('float32'),))

    probs = paddle.infer(output_layer=out,
                         parameters=parameters,
                         input=test_data)
    lab = np.argsort(-probs)
    all_result = []
    for i in range(len(lab)):
        all_result.append([lab[i][0], probs[i][lab[i][0]]])
    return all_result


if __name__ == '__main__':
    paddle.init(use_gpu=False, trainer_count=2)
    type_size = 3
    imageSize = 64
    parameters_path = "../model/model.tar"
    datadim = 3 * imageSize * imageSize

    image_path = [
        "../images/vegetables/cuke/1515826971850.jpg",
        "../images/vegetables/lettuce/1515827012863.jpg",
        "../images/vegetables/lotus_root/1515827059200.jpg"
    ]

    out = vgg_bn_drop(datadim=datadim, type_size=type_size)
    parameters = get_parameters(parameters_path=parameters_path)
    all_result = to_prediction(image_paths=image_path, parameters=parameters, out=out, imageSize=imageSize)

    for result in all_result:
        print "预测结果: %d, 可信度: %.6f" % (result[0], result[1])

下载图像(可选)

如果需要从百度图片下载图像,可以使用以下脚本:
```python

coding=utf-8

import re
import uuid
import requests
import os

class DownloadImages:
def init(self, download_max, key_word):
self.download_sum = 0
self.download_max = download_max
self.key_word = key_word
self.save_path = ‘../images/download/’ + key_word

def start_download(self):
    self.download_sum = 0
    gsm = 80
    while self.download_sum < self.download_max:
        url = 'http://image.baidu.com/search/flip?tn=baiduimage&ie=utf-8&' \
              'word=' + self.key_word + '&pn=' + str(self.download_sum) + '&gsm=' + str(gsm)
        result = requests.get(url)
        self.downloadImages(result.text)
    print('下载完成')

def downloadImages(self, html):
    img_urls = re.findall('"objURL":"(.*?)"', html, re.S)
    for img_url in img_urls:
        if self.download_sum >= self.download_max:
            break
        try:
            pic = requests.get(img_url, timeout=50)
            pic_name = self.save_path + '/' + str(uuid.uuid1())
Xiaoye