前言

本章介紹如何使用Tensorflow實現簡單的聲紋識別模型,首先你需要熟悉音頻分類,沒有了解的可以查看這篇文章《基於Tensorflow實現聲音分類》。基於這個知識基礎之上,我們訓練一個聲紋識別模型,通過這個模型我們可以識別說話的人是誰,可以應用在一些需要音頻驗證的項目。

環境準備

主要介紹libsora,PyAudio,pydub的安裝,其他的依賴包根據需要自行安裝。
- Python 3.7
- Tensorflow 2.0

安裝libsora

最簡單的方式就是使用pip命令安裝,如下:

pip install pytest-runner
pip install librosa

如果pip命令安裝不成功,那就使用源碼安裝,下載源碼:https://github.com/librosa/librosa/releases/, windows的可以下載zip壓縮包,方便解壓。

pip install pytest-runner
tar xzf librosa-<版本號>.tar.gz 或者 unzip librosa-<版本號>.tar.gz
cd librosa-<版本號>/
python setup.py install

如果出現libsndfile64bit.dll': error 0x7e錯誤,請指定安裝版本0.6.3,如pip install librosa==0.6.3

安裝PyAudio

使用pip安裝命令,如下:

pip install pyaudio

在安裝的時候需要使用到C++庫進行編譯,如果讀者的系統是windows,Python是3.7,可以在這裏下載whl安裝包,下載地址:https://github.com/intxcc/pyaudio_portaudio/releases

安裝pydub

使用pip命令安裝,如下:

pip install pydub

創建數據

本教程筆者使用的是Free ST Chinese Mandarin Corpus數據集,這個數據集一共有855個人的語音數據,有102600條語音數據。如果讀者有其他更好的數據集,可以混合在一起使用。

如何已經讀過筆者《基於Tensorflow實現聲音分類》這篇文章,應該知道語音數據小而多,最好的方法就是把這些音頻文件生成TFRecord,加快訓練速度。所以創建create_data.py用於生成TFRecord文件。

首先是創建一個數據列表,數據列表的格式爲<語音文件路徑\t語音分類標籤>,創建這個列表主要是方便之後的讀取,也是方便讀取使用其他的語音數據集,不同的語音數據集,可以通過編寫對應的生成數據列表的函數,把這些數據集都寫在同一個數據列表中,這樣就可以在下一步直接生成TFRecord文件了。

def get_data_list(audio_path, list_path):
    files = os.listdir(audio_path)

    f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
    f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')

    sound_sum = 0
    s = set()
    for file in files:
        if '.wav' not in file:
            continue
        s.add(file[:15])
        sound_path = os.path.join(audio_path, file)
        if sound_sum % 100 == 0:
            f_test.write('%s\t%d\n' % (sound_path.replace('\\', '/'), len(s) - 1))
        else:
            f_train.write('%s\t%d\n' % (sound_path.replace('\\', '/'), len(s) - 1))
        sound_sum += 1

    f_test.close()
    f_train.close()

if __name__ == '__main__':
    get_data_list('dataset/ST-CMDS-20170001_1-OS', 'dataset')

有了上面創建的數據列表,就可以把語音數據轉換成訓練數據了,主要是把語音數據轉換成梅爾頻譜(Mel Spectrogram),使用librosa可以很方便得到音頻的梅爾頻譜,使用的API爲librosa.feature.melspectrogram(),輸出的是numpy值,可以直接用tensorflow訓練和預測。關於梅爾頻譜具體信息讀者可以自行了解,跟梅爾頻譜同樣很重要的梅爾倒譜(MFCCs)更多用於語音識別中,對應的API爲librosa.feature.mfcc()。在轉換過程中,筆者還使用了librosa.effects.split裁剪掉靜音部分的音頻,這樣可以減少訓練數據的噪聲,提供訓練準確率。筆者目前默認每條語音的長度爲2.04秒,這個讀者可以根據自己的情況修改語音的長度,如果要修改訓練語音的長度,需要根據註釋的提示修改相應的數據值。如果語音長度比較長的,程序會隨機裁剪20次,以達到數據增強的效果。

# 獲取浮點數組
def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


# 獲取整型數據
def _int64_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


# 把數據添加到TFRecord中
def data_example(data, label):
    feature = {
        'data': _float_feature(data),
        'label': _int64_feature(label),
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))


# 開始創建tfrecord數據
def create_data_tfrecord(data_list_path, save_path):
    with open(data_list_path, 'r') as f:
        data = f.readlines()
    with tf.io.TFRecordWriter(save_path) as writer:
        for d in tqdm(data):
            try:
                path, label = d.replace('\n', '').split('\t')
                wav, sr = librosa.load(path, sr=16000)
                intervals = librosa.effects.split(wav, top_db=20)
                wav_output = []
                # [可能需要修改參數] 音頻長度 16000 * 秒數
                wav_len = int(16000 * 2.04)
                for sliced in intervals:
                    wav_output.extend(wav[sliced[0]:sliced[1]])
                for i in range(20):
                    # 裁剪過長的音頻,過短的補0
                    if len(wav_output) > wav_len:
                        l = len(wav_output) - wav_len
                        r = random.randint(0, l)
                        wav_output = wav_output[r:wav_len + r]
                    else:
                        wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
                    wav_output = np.array(wav_output)
                    # 轉成梅爾頻譜
                    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).reshape(-1).tolist()
                    # [可能需要修改參數] 梅爾頻譜shape ,librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).shape
                    if len(ps) != 128 * 128: continue
                    tf_example = data_example(ps, int(label))
                    writer.write(tf_example.SerializeToString())
                    if len(wav_output) <= wav_len:
                        break
            except Exception as e:
                print(e)
if __name__ == '__main__':
    create_data_tfrecord('dataset/train_list.txt', 'dataset/train.tfrecord')
    create_data_tfrecord('dataset/test_list.txt', 'dataset/test.tfrecord')

在上面已經創建了TFRecord文件,爲了可以在訓練中讀取TFRecord文件,創建reader.py程序用於讀取訓練數據,如果讀者已經修改了訓練數據的長度,需要修改tf.io.FixedLenFeature中的值。

def _parse_data_function(example):
    # [可能需要修改參數】 設置的梅爾頻譜的shape相乘的值
    data_feature_description = {
        'data': tf.io.FixedLenFeature([16384], tf.float32),
        'label': tf.io.FixedLenFeature([], tf.int64),
    }
    return tf.io.parse_single_example(example, data_feature_description)


def train_reader_tfrecord(data_path, num_epochs, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    train_dataset = raw_dataset.map(_parse_data_function)
    train_dataset = train_dataset.shuffle(buffer_size=1000) \
        .repeat(count=num_epochs) \
        .batch(batch_size=batch_size) \
        .prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    return train_dataset


def test_reader_tfrecord(data_path, batch_size):
    raw_dataset = tf.data.TFRecordDataset(data_path)
    test_dataset = raw_dataset.map(_parse_data_function)
    test_dataset = test_dataset.batch(batch_size=batch_size)
    return test_dataset

訓練模型

創建train.py開始訓練模型,搭建一個ResNet50分類模型,input_shape設置爲(128, None, 1))主要是爲了適配其他音頻長度的輸入和預測是任意大小的輸入。class_dim爲分類的總數,Free ST Chinese Mandarin Corpus數據集一共有855個人的語音數據,所以這裏分類總數爲855,可以使用之前訓練過的權重初始化模型,下載看文章最後。

class_dim = 855
EPOCHS = 500
BATCH_SIZE=32
init_model = "models/model_weights.h5"

model = tf.keras.models.Sequential([
    tf.keras.applications.ResNet50V2(include_top=False, weights=None, input_shape=(128, None, 1)),
    tf.keras.layers.ActivityRegularization(l2=0.5),
    tf.keras.layers.Dropout(rate=0.5),
    tf.keras.layers.GlobalMaxPooling2D(),
    tf.keras.layers.Dense(units=class_dim, activation=tf.nn.softmax)
])

model.summary()

# 定義優化方法
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)

train_dataset = reader.train_reader_tfrecord('dataset/train.tfrecord', EPOCHS, batch_size=BATCH_SIZE)
test_dataset = reader.test_reader_tfrecord('dataset/test.tfrecord', batch_size=BATCH_SIZE)

if init_model:
    model.load_weights(init_model)

開始執行訓練,要注意的是在創建TFRecord文件時,已經把音頻數據的梅爾頻譜轉換爲一維list了,所以在數據輸入到模型前,需要把數據reshape爲之前的shape,操作方式爲reshape((-1, 128, 128, 1))。要注意的是如果讀者使用了其他長度的音頻,需要根據梅爾頻譜的shape修改,訓練數據和測試數據都需要做同樣的處理。每訓練200個batch執行一次測試和保存模型,包括預測模型和網絡權重。

for batch_id, data in enumerate(train_dataset):
    # [可能需要修改參數】 設置的梅爾頻譜的shape
    sounds = data['data'].numpy().reshape((-1, 128, 128, 1))
    labels = data['label']
    # 執行訓練
    with tf.GradientTape() as tape:
        predictions = model(sounds)
        # 獲取損失值
        train_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
        train_loss = tf.reduce_mean(train_loss)
        # 獲取準確率
        train_accuracy = tf.keras.metrics.sparse_categorical_accuracy(labels, predictions)
        train_accuracy = np.sum(train_accuracy.numpy()) / len(train_accuracy.numpy())

    # 更新梯度
    gradients = tape.gradient(train_loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    if batch_id % 20 == 0:
        print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy))

    if batch_id % 200 == 0 and batch_id != 0:
        test_losses = list()
        test_accuracies = list()
        for d in test_dataset:
            # [可能需要修改參數】 設置的梅爾頻譜的shape
            test_sounds = d['data'].numpy().reshape((-1, 128, 128, 1))
            test_labels = d['label']

            test_result = model(test_sounds)
            # 獲取損失值
            test_loss = tf.keras.losses.sparse_categorical_crossentropy(test_labels, test_result)
            test_loss = tf.reduce_mean(test_loss)
            test_losses.append(test_loss)
            # 獲取準確率
            test_accuracy = tf.keras.metrics.sparse_categorical_accuracy(test_labels, test_result)
            test_accuracy = np.sum(test_accuracy.numpy()) / len(test_accuracy.numpy())
            test_accuracies.append(test_accuracy)

        print('=================================================')
        print("Test, Loss %f, Accuracy %f" % (
            sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies)))
        print('=================================================')

        # 保存模型
        model.save(filepath='models/resnet.h5')
        model.save_weights(filepath='models/model_weights.h5')

聲紋對比

下面開始實現聲紋對比,創建infer_contrast.py程序,在加載模型時,不要直接加載整個模型,而是加載模型的最後分類層的上一層,這樣就可以獲取到語音的特徵數據。通過使用netron查看每一層的輸入和輸出的名稱。

layer_name = 'global_max_pooling2d'
model = tf.keras.models.load_model('models/resnet.h5')
intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)

然後編寫兩個函數,分類是加載數據和執行預測的函數,在這個加載數據函數中並沒有限定輸入音頻的大小,只是不允許裁剪靜音後的音頻不能小於0.5秒,這樣就可以輸入任意長度的音頻。執行預測之後數據的是語音的特徵值。

def load_data(data_path):
    wav, sr = librosa.load(data_path, sr=16000)
    intervals = librosa.effects.split(wav, top_db=20)
    wav_output = []
    for sliced in intervals:
        wav_output.extend(wav[sliced[0]:sliced[1]])
    assert len(wav_output) >= 8000, "有效音頻小於0.5s"
    wav_output = np.array(wav_output)
    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
    ps = ps[np.newaxis, ..., np.newaxis]
    return ps


def infer(audio_path):
    data = load_data(audio_path)
    feature = intermediate_layer_model.predict(data)
    return feature

有了上面兩個函數,就可以做聲紋識別了。我們輸入兩個語音,通過預測函數獲取他們的特徵數據,使用這個特徵數據可以求他們的對角餘弦值,得到的結果可以作爲他們相識度。對於這個相識度的閾值,讀者可以根據自己項目的準確度要求進行修改。

if __name__ == '__main__':
    # 要預測的兩個人的音頻文件
    person1 = 'dataset/ST-CMDS-20170001_1-OS/20170001P00011A0001.wav'
    person2 = 'dataset/ST-CMDS-20170001_1-OS/20170001P00011I0081.wav'
    feature1 = infer(person1)[0]
    feature2 = infer(person2)[0]
    # 對角餘弦值
    dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2))
    if dist > 0.7:
        print("%s%s 爲同一個人,相似度爲:%f" % (person1, person2, dist))
    else:
        print("%s%s 不是同一個人,相似度爲:%f" % (person1, person2, dist))

聲紋識別

在上面的聲紋對比的基礎上,我們創建infer_recognition.py實現聲紋識別。同樣是使用上面聲紋對比的數據加載函數和預測函數,通過這兩個同樣獲取語音的特徵數據。

layer_name = 'global_max_pooling2d'
model = tf.keras.models.load_model('models/resnet.h5')
intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)

person_feature = []
person_name = []


# 讀取音頻數據
def load_data(data_path):
    wav, sr = librosa.load(data_path, sr=16000)
    intervals = librosa.effects.split(wav, top_db=20)
    wav_output = []
    for sliced in intervals:
        wav_output.extend(wav[sliced[0]:sliced[1]])
    if len(wav_output) < 8000:
        raise Exception("有效音頻小於0.5s")
    wav_output = np.array(wav_output)
    ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
    ps = ps[np.newaxis, ..., np.newaxis]
    return ps


def infer(audio_path):
    data = load_data(audio_path)
    feature = intermediate_layer_model.predict(data)
    return feature

不同的是筆者增加了load_audio_db()recognition(),第一個函數是加載語音庫中的語音數據,這些音頻就是相當於已經註冊的用戶,他們註冊的語音數據會存放在這裏,如果有用戶需要通過聲紋登錄,就需要拿到用戶的語音和語音庫中的語音進行聲紋對比,如果對比成功,那就相當於登錄成功並且獲取用戶註冊時的信息數據。完成識別的主要在recognition()函數中,這個函數就是將輸入的語音和語音庫中的語音一一對比。

def load_audio_db(audio_db_path):
    audios = os.listdir(audio_db_path)
    for audio in audios:
        path = os.path.join(audio_db_path, audio)
        name = audio[:-4]
        feature = infer(path)
        person_name.append(name)
        person_feature.append(feature)
        print("Loaded %s audio." % name)


def recognition(path):
    name = ''
    pro = 0
    feature = infer(path)
    for i, person_f in enumerate(person_feature):
        dist = np.dot(feature, person_f) / (np.linalg.norm(feature) * np.linalg.norm(person_f))
        if dist > pro:
            pro = dist
            name = person_name[i]
    return name, pro

有了上面的聲紋識別的函數,讀者可以根據自己項目的需求完成聲紋識別的方式,例如筆者下面提供的是通過錄音來完成聲紋識別。首先必須要加載語音庫中的語音,語音庫文件夾爲audio_db,然後用戶回車後錄音3秒鐘,然後程序會自動錄音,並使用錄音到的音頻進行聲紋識別,去匹配語音庫中的語音,獲取用戶的信息。通過這樣方式,讀者也可以修改成通過服務請求的方式完成聲紋識別,例如提供一個API供APP調用,用戶在APP上通過聲紋登錄時,把錄音到的語音發送到後端完成聲紋識別,再把結果返回給APP,前提是用戶已經使用語音註冊,併成功把語音數據存放在audio_db文件夾中。

if __name__ == '__main__':
    load_audio_db('audio_db')
    # 錄音參數
    CHUNK = 1024
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 16000
    RECORD_SECONDS = 3
    WAVE_OUTPUT_FILENAME = "infer_audio.wav"

    # 打開錄音
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT,
                    channels=CHANNELS,
                    rate=RATE,
                    input=True,
                    frames_per_buffer=CHUNK)

    while True:
        try:
            i = input("按下回車鍵開機錄音,錄音3秒中:")
            print("開始錄音......")
            frames = []
            for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                data = stream.read(CHUNK)
                frames.append(data)

            print("錄音已結束!")

            wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(p.get_sample_size(FORMAT))
            wf.setframerate(RATE)
            wf.writeframes(b''.join(frames))
            wf.close()

            # 識別對比音頻庫的音頻
            name, p = recognition(WAVE_OUTPUT_FILENAME)
            if p > 0.7:
                print("識別說話的爲:%s,相似度爲:%f" % (name, p))
            else:
                print("音頻庫沒有該用戶的語音")
        except:
            pass

Github地址: https://github.com/yeyupiaoling/VoiceprintRecognition-Tensorflow/tree/master

其他版本

小夜