前言

本章我們來介紹如何使用Tensorflow訓練一個區分不同音頻的分類模型,例如你有這樣一個需求,需要根據不同的鳥叫聲識別是什麼種類的鳥,這時你就可以使用這個方法來實現你的需求了。話不多說,來幹。

環境準備

主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。
- Python 3.7
- Tensorflow 2.0

安裝libsora

最簡單的方式就是使用pip命令安裝,如下:

pip install pytest-runner
pip install librosa

如果pip命令安裝不成功,那就使用源碼安裝,下載源碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。

pip install pytest-runner
tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz
cd librosa-<版本號>/
python setup.py install

如果出現libsndfile64bit.dll': error 0x7e錯誤,請指定安裝版本0.6.3,如pip install librosa==0.6.3

安裝PyAudio

使用pip安裝命令,如下:

pip install pyaudio

在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這裏下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases

安裝pydub

使用pip命令安裝,如下:

pip install pydub

訓練分類模型

把音頻轉換成訓練數據最重要的是使用了librosa,使用librosa可以很方便得到音頻的梅爾頻譜(Mel Spectrogram),使用的API爲librosa.feature.melspectrogram(),輸出的是numpy值,可以直接用tensorflow訓練和預測。關於梅爾頻譜具體信息讀者可以自行了解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用於語音識別中,對應的API爲librosa.feature.mfcc()。同樣以下的代碼,就可以獲取到音頻的梅爾頻譜,其中duration參數指定的是截取音頻的長度。

y1, sr1 = librosa.load(data_path, duration=2.97)
ps = librosa.feature.melspectrogram(y=y1, sr=sr1)

創建訓練數據

根據上面的方法,我們創建Tensorflow訓練數據,因爲分類音頻數據小而多,最好的方法就是把這些音頻文件生成TFRecord,加快訓練速度。創建create_data.py用於生成TFRecord文件。

首先需要生成數據列表,用於下一步的讀取需要,audio_path爲音頻文件路徑,用戶需要提前把音頻數據集存放在dataset/audio目錄下,每個文件夾存放一個類別的音頻數據,如dataset/audio/鳥叫聲/······。每條音頻數據長度大於2.1秒,當然可以可以只其他的音頻長度,這個可以根據讀取的需要修改,如有需要的參數筆者都使用註釋標註了。audio是數據列表存放的位置,生成的數據類別的格式爲音頻路徑\t音頻對應的類別標籤。讀者也可以根據自己存放數據的方式修改以下函數。

def get_data_list(audio_path, list_path):
    sound_sum = 0
    audios = os.listdir(audio_path)

    f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
    f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')

    for i in range(len(audios)):
        sounds = os.listdir(os.path.join(audio_path, audios[i]))
        for sound in sounds:
            sound_path = os.path.join(audio_path, audios[i], sound)
            t = librosa.get_duration(filename=sound_path)
            # [可能需要修改參數] 過濾小於2.1秒的音頻
            if t >= 2.1:
                if sound_sum % 100 == 0:
                    f_test.write('%s\t%d\n' % (sound_path, i))
                else:
                    f_train.write('%s\t%d\n' % (sound_path, i))
                sound_sum += 1
        print("Audio:%d/%d" % (i + 1, len(audios)))

    f_test.close()
    f_train.close()

if __name__ == '__main__':
    get_data_list('dataset/audio', 'dataset')

有了以上的數據列表,就可開始生成TFRecord文件了。最終會生成train.tfrecordtest.tfrecord。筆者設置的音頻長度爲2.04秒,不足長度會補0,如果需要使用不同的音頻長度時,需要修改wav_len參數值和len(ps)過濾值,wav_len參數值爲音頻長度 16000 * 秒數,len(ps)過濾值爲梅爾頻譜shape相乘。

# 獲取浮點數組
def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


# 獲取整型數據
def _int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


# 把數據添加到TFRecord中
def data_example(data, label):
    feature = {
        'data': _float_feature(data),
        'label': _int64_feature(label),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


# 開始創建tfrecord數據
def create_data_tfrecord(data_list_path, save_path):
    with open(data_list_path, 'r') as f:
        data = f.readlines()
    with tf.io.TFRecordWriter(save_path) as writer:
        for d in tqdm(data):
            try:
                path, label = d.replace('\n', '').split('\t')
                wav, sr = librosa.load(path, sr=16000)
                intervals = librosa.effects.split(wav, top_db=20)
                wav_output = []
                # [可能需要修改參數] 音頻長度 16000 * 秒數
                wav_len = int(16000 * 2.04)
                for sliced in intervals:
                    wav_output.extend(wav[sliced[0]:sliced[1]])
                for i in range(5):
                    # 裁剪過長的音頻,過短的補0
                    if len(wav_output) > wav_len:
                        l = len(wav_output) - wav_len
                        r = random.randint(0, l)
                        wav_output = wav_output[r:wav_len + r]
                    else:
                        wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
                    wav_output = np.array(wav_output)
                    # 轉成梅爾頻譜
                    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).reshape(-1).tolist()
                    # [可能需要修改參數] 梅爾頻譜shape ,librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).shape
                    if len(ps) != 128 * 128: continue
                    tf_example = data_example(ps, int(label))
                    writer.write(tf_example.SerializeToString())
                    if len(wav_output) <= wav_len:
                        break
            except Exception as e:
                print(e)


if __name__ == '__main__':
    create_data_tfrecord('dataset/train_list.txt', 'dataset/train.tfrecord')
    create_data_tfrecord('dataset/test_list.txt', 'dataset/test.tfrecord')

Urbansound8K 是目前應用較爲廣泛的用於自動城市環境聲分類研究的公共數據集,包含10個分類:空調聲、汽車鳴笛聲、兒童玩耍聲、狗叫聲、鑽孔聲、引擎空轉聲、槍聲、手提鑽、警笛聲和街道音樂聲。數據集下載地址:https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz。以下是針對Urbansound8K生成數據列表的函數。如果讀者想使用該數據集,請下載並解壓到dataset目錄下,把生成數據列表代碼改爲以下代碼。

# 創建UrbanSound8K數據列表
def get_urbansound8k_list(path, urbansound8k_cvs_path):
    data_list = []
    data = pd.read_csv(urbansound8k_cvs_path)
    # 過濾掉長度少於3秒的音頻
    valid_data = data[['slice_file_name', 'fold', 'classID', 'class']][data['end'] - data['start'] >= 3]
    valid_data['path'] = 'fold' + valid_data['fold'].astype('str') + '/' + valid_data['slice_file_name'].astype('str')
    for row in valid_data.itertuples():
        data_list.append([row.path, row.classID])

    f_train = open(os.path.join(path, 'train_list.txt'), 'w')
    f_test = open(os.path.join(path, 'test_list.txt'), 'w')

    for i, data in enumerate(data_list):
        sound_path = os.path.join('dataset/UrbanSound8K/audio/', data[0])
        if i % 100 == 0:
            f_test.write('%s\t%d\n' % (sound_path, data[1]))
        else:
            f_train.write('%s\t%d\n' % (sound_path, data[1]))

    f_test.close()
    f_train.close()


if __name__ == '__main__':
    get_urbansound8k_list('dataset', 'dataset/UrbanSound8K/metadata/UrbanSound8K.csv')

創建reader.py用於在訓練時讀取TFRecord文件數據。如果讀者使用了其他的音頻長度,需要修改一下tf.io.FixedLenFeature參數的值,爲梅爾頻譜的shape相乘的值。

import tensorflow as tf

def _parse_data_function(example):
    # [可能需要修改參數】 設置的梅爾頻譜的shape相乘的值
    data_feature_description = {
        'data': tf.io.FixedLenFeature([16384], tf.float32),
        'label': tf.io.FixedLenFeature([], tf.int64),
    }
    return tf.io.parse_single_example(example, data_feature_description)


def train_reader_tfrecord(data_path, num_epochs, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    train_dataset = raw_dataset.map(_parse_data_function)
    train_dataset = train_dataset.shuffle(buffer_size=1000) \
        .repeat(count=num_epochs) \
        .batch(batch_size=batch_size) \
        .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return train_dataset


def test_reader_tfrecord(data_path, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    test_dataset = raw_dataset.map(_parse_data_function)
    test_dataset = test_dataset.batch(batch_size=batch_size)
    return test_dataset

訓練

接着就可以開始訓練模型了,創建train.py。我們搭建簡單的卷積神經網絡,通過把音頻數據轉換成梅爾頻譜,數據的shape也相當於灰度圖,所以我們可以當作圖像的輸入創建一個深度神經網絡。然後定義優化方法和獲取訓練和測試數據。input_shape設置爲(128, None, 1))主要是爲了適配其他音頻長度的輸入和預測是任意大小的輸入。class_dim爲分類的總數。

import tensorflow as tf
import reader
import numpy as np

class_dim = 10
EPOCHS = 100
BATCH_SIZE=32

model = tf.keras.models.Sequential([
    tf.keras.applications.ResNet50V2(include_top=False, weights=None, input_shape=(128, None, 1)),
    tf.keras.layers.ActivityRegularization(l2=0.5),
    tf.keras.layers.Dropout(rate=0.5),
    tf.keras.layers.GlobalMaxPooling2D(),
    tf.keras.layers.Dense(units=class_dim, activation=tf.nn.softmax)
])

model.summary()


# 定義優化方法
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

train_dataset = reader.train_reader_tfrecord('dataset/train.tfrecord', EPOCHS, batch_size=BATCH_SIZE)
test_dataset = reader.test_reader_tfrecord('dataset/test.tfrecord', batch_size=BATCH_SIZE)

最後執行訓練,每200個batch執行一次測試和保存模型。要注意的是在創建TFRecord文件時,已經把音頻數據的梅爾頻譜轉換爲一維list了,所以在數據輸入到模型前,需要把數據reshape爲之前的shape,操作方式爲reshape((-1, 128, 128, 1))。要注意的是如果讀者使用了其他長度的音頻,需要根據梅爾頻譜的shape修改。

for batch_id, data in enumerate(train_dataset):
    # [可能需要修改參數】 設置的梅爾頻譜的shape
    sounds = data['data'].numpy().reshape((-1, 128, 128, 1))
    labels = data['label']
    # 執行訓練
    with tf.GradientTape() as tape:
        predictions = model(sounds)
        # 獲取損失值
        train_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
        train_loss = tf.reduce_mean(train_loss)
        # 獲取準確率
        train_accuracy = tf.keras.metrics.sparse_categorical_accuracy(labels, predictions)
        train_accuracy = np.sum(train_accuracy.numpy()) / len(train_accuracy.numpy())

    # 更新梯度
    gradients = tape.gradient(train_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    if batch_id % 20 == 0:
        print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy))

    if batch_id % 200 == 0 and batch_id != 0:
        test_losses = list()
        test_accuracies = list()
        for d in test_dataset:
            # [可能需要修改參數】 設置的梅爾頻譜的shape
            test_sounds = d['data'].numpy().reshape((-1, 128, 128, 1))
            test_labels = d['label']

            test_result = model(test_sounds)
            # 獲取損失值
            test_loss = tf.keras.losses.sparse_categorical_crossentropy(test_labels, test_result)
            test_loss = tf.reduce_mean(test_loss)
            test_losses.append(test_loss)
            # 獲取準確率
            test_accuracy = tf.keras.metrics.sparse_categorical_accuracy(test_labels, test_result)
            test_accuracy = np.sum(test_accuracy.numpy()) / len(test_accuracy.numpy())
            test_accuracies.append(test_accuracy)

        print('=================================================')
        print("Test, Loss %f, Accuracy %f" % (
            sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies)))
        print('=================================================')

        # 保存模型
        model.save(filepath='models/resnet50.h5')

預測

在訓練結束之後,我們得到了一個預測模型,有了預測模型,執行預測非常方便。我們使用這個模型預測音頻,輸入的音頻會裁剪靜音部分,所以非靜音部分不能小於 0.5 秒,避免特徵數量太少,當然這也不是一定的,可以任意修改。在執行預測之前,需要把音頻裁剪掉靜音部分,並且把裁剪後的音頻轉換爲梅爾頻譜數據。預測的數據shape第一個爲輸入數據的 batch 大小,如果想多個音頻一起數據,可以把他們存放在 list 中一起預測。最後輸出的結果即爲預測概率最大的標籤。

import librosa
import numpy as np
import tensorflow as tf

model = tf.keras.models.load_model('models/resnet50.h5')

# 讀取音頻數據
def load_data(data_path):
    wav, sr = librosa.load(data_path, sr=16000)
    intervals = librosa.effects.split(wav, top_db=20)
    wav_output = []
    for sliced in intervals:
        wav_output.extend(wav[sliced[0]:sliced[1]])
    assert len(wav_output) >= 8000, "有效音頻小於0.5s"
    wav_output = np.array(wav_output)
    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
    ps = ps[np.newaxis, ..., np.newaxis]
    return ps


def infer(audio_path):
    data = load_data(audio_path)
    result = model.predict(data)
    lab = tf.argmax(result, 1)
    return lab


if __name__ == '__main__':
    # 要預測的音頻文件
    path = ''
    label = infer(path)
    print('音頻:%s 的預測結果標籤爲:%d' % (path, label))

其他

爲了方便讀取錄製數據和製作數據集,這裏提供了兩個程序,首先是record_audio.py,這個用於錄製音頻,錄製的音頻幀率爲44100,通道爲1,16bit。

import pyaudio
import wave
import uuid
from tqdm import tqdm
import os

s = input('請輸入你計劃錄音多少秒:')

CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = int(s)
WAVE_OUTPUT_FILENAME = "save_audio/%s.wav" % str(uuid.uuid1()).replace('-', '')

p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)

print("開始錄音, 請說話......")

frames = []

for i in tqdm(range(0, int(RATE / CHUNK * RECORD_SECONDS))):
    data = stream.read(CHUNK)
    frames.append(data)

print("錄音已結束!")

stream.stop_stream()
stream.close()
p.terminate()

if not os.path.exists('save_audio'):
    os.makedirs('save_audio')

wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()

print('文件保存在:%s' % WAVE_OUTPUT_FILENAME)
os.system('pause')

創建crop_audio.py,筆者在訓練默認訓練2.04秒的音頻,所以我們要把錄製的硬盤安裝每3秒裁剪一段,把裁剪後音頻存放在音頻名稱命名的文件夾中。最後把這些文件按照訓練數據的要求創建數據列表,和生成TFRecord文件。

import os
import uuid
import wave
from pydub import AudioSegment


# 按秒截取音頻
def get_part_wav(sound, start_time, end_time, part_wav_path):
    save_path = os.path.dirname(part_wav_path)
    if not os.path.exists(save_path):
        os.makedirs(save_path)
    start_time = int(start_time) * 1000
    end_time = int(end_time) * 1000
    word = sound[start_time:end_time]
    word.export(part_wav_path, format="wav")


def crop_wav(path, crop_len):
    for src_wav_path in os.listdir(path):
        wave_path = os.path.join(path, src_wav_path)
        print(wave_path[-4:])
        if wave_path[-4:] != '.wav':
            continue
        file = wave.open(wave_path)
        # 幀總數
        a = file.getparams().nframes
        # 採樣頻率
        f = file.getparams().framerate
        # 獲取音頻時間長度
        t = int(a / f)
        print('總時長爲 %d s' % t)
        # 讀取語音
        sound = AudioSegment.from_wav(wave_path)
        for start_time in range(0, t, crop_len):
            save_path = os.path.join(path, os.path.basename(wave_path)[:-4], str(uuid.uuid1()) + '.wav')
            get_part_wav(sound, start_time, start_time + crop_len, save_path)


if __name__ == '__main__':
    crop_len = 3
    crop_wav('save_audio', crop_len)

創建infer_record.py,這個程序是用來不斷進行錄音識別,錄音時間之所以設置爲 3 秒,保證裁剪靜音部分後有足夠的音頻長度用於預測,當然也可以修改成其他的長度值。因爲識別的時間比較短,所以我們可以大致理解爲這個程序在即時錄音識別。通過這個應該我們可以做一些比較有趣的事情,比如把麥克風放在小鳥經常來的地方,通過即時錄音識別,一旦識別到有鳥叫的聲音,如果你的數據集足夠強大,有每種鳥叫的聲音數據集,這樣你還能準確識別是那種鳥叫。如果識別到目標鳥類,就啓動程序,例如拍照等等。

import wave
import librosa
import numpy as np
import pyaudio
import tensorflow as tf

# 獲取網絡模型
model = tf.keras.models.load_model('models/resnet50.h5')

# 錄音參數
CHUNK = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
RECORD_SECONDS = 3
WAVE_OUTPUT_FILENAME = "infer_audio.wav"

# 打開錄音
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)


# 讀取音頻數據
def load_data(data_path):
    wav, sr = librosa.load(data_path, sr=16000)
    intervals = librosa.effects.split(wav, top_db=20)
    wav_output = []
    for sliced in intervals:
        wav_output.extend(wav[sliced[0]:sliced[1]])
    if len(wav_output) < 8000:
        raise Exception("有效音頻小於0.5s")
    wav_output = np.array(wav_output)
    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
    ps = ps[np.newaxis, ..., np.newaxis]
    return ps


# 獲取錄音數據
def record_audio():
    print("開始錄音......")

    frames = []
    for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
        data = stream.read(CHUNK)
        frames.append(data)

    print("錄音已結束!")

    wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    wf.close()
    return WAVE_OUTPUT_FILENAME


# 預測
def infer(audio_data):
    result = model.predict(audio_data)
    lab = tf.argmax(result, 1)
    return lab


if __name__ == '__main__':
    try:
        while True:
            # 加載數據
            data = load_data(record_audio())

            # 獲取預測結果
            label = infer(data)
            print('預測的標籤爲:%d' % label)
    except Exception as e:
        print(e)
        stream.stop_stream()
        stream.close()
        p.terminate()

Github地址:https://github.com/yeyupiaoling/AudioClassification_Tensorflow

小夜