# encoding: utf-8
#作者:韦访
#csdn:https://blog.csdn.net/rookie_wei
import numpy as np
from python_speech_features import mfcc
import scipy.io.wavfile as wav
import os
import time
import tensorflow as tf
from tensorflow.python.ops import ctc_ops
from collections import Counter
# 获取文件夹下所有的WAV文件
def get_wav_files(wav_path):
wav_files = []
for (dirpath, dirnames, filenames) in os.walk(wav_path):
for filename in filenames:
if filename.endswith('.wav') or filename.endswith('.WAV'):
# print(filename)
filename_path = os.path.join(dirpath, filename)
# print(filename_path)
wav_files.append(filename_path)
return wav_files
# 获取wav文件对应的翻译文字
def get_tran_texts(wav_files, tran_path):
tran_texts = []
for wav_file in wav_files:
(wav_path, wav_filename) = os.path.split(wav_file)
tran_file = os.path.join(tran_path, wav_filename + '.trn')
# print(tran_file)
if os.path.exists(tran_file) is False:
return None
fd = open(tran_file, 'r')
text = fd.readline()
tran_texts.append(text.split('\n')[0])
fd.close()
return tran_texts
# 获取wav和对应的翻译文字
def get_wav_files_and_tran_texts(wav_path, tran_path):
wav_files = get_wav_files(wav_path)
tran_texts = get_tran_texts(wav_files, tran_path)
return wav_files, tran_texts
# 旧的训练集使用该方法获取音频文件名和译文
def get_wavs_lables(wav_path, label_file):
wav_files = []
for (dirpath, dirnames, filenames) in os.walk(wav_path):
for filename in filenames:
if filename.endswith('.wav') or filename.endswith('.WAV'):
filename_path = os.sep.join([dirpath, filename])
if os.stat(filename_path).st_size < 240000: # 剔除掉一些小文件
continue
wav_files.append(filename_path)
labels_dict = {}
with open(label_file, 'rb') as f:
for label in f:
label = label.strip(b'\n')
label_id = label.split(b' ', 1)[0]
label_text = label.split(b' ', 1)[1]
labels_dict[label_id.decode('ascii')] = label_text.decode('utf-8')
labels = []
new_wav_files = []
for wav_file in wav_files:
wav_id = os.path.basename(wav_file).split('.')[0]
if wav_id in labels_dict:
labels.append(labels_dict[wav_id])
new_wav_files.append(wav_file)
return new_wav_files, labels
# Constants
SPACE_TOKEN = '<space>'
SPACE_INDEX = 0
FIRST_INDEX = ord('a') - 1 # 0 is reserved to space
# 将稀疏矩阵的字向量转成文字
# tuple是sparse_tuple_from函数的返回值
def sparse_tuple_to_texts_ch(tuple, words):
# 索引
indices = tuple[0]
# 字向量
values = tuple[1]
results = [''] * tuple[2][0]
for i in range(len(indices)):
index = indices[i][0]
c = values[i]
c = ' ' if c == SPACE_INDEX else words[c]
results[index] = results[index] + c
return results
# 将密集矩阵的字向量转成文字
def ndarray_to_text_ch(value, words):
results = ''
for i in range(len(value)):
results += words[value[i]] # chr(value[i] + FIRST_INDEX)
return results.replace('`', ' ')
# 创建序列的稀疏表示
def sparse_tuple_from(sequences, dtype=np.int32):
indices = []
values = []
for n, seq in enumerate(sequences):
indices.extend(zip([n] * len(seq), range(len(seq))))
values.extend(seq)
indices = np.asarray(indices, dtype=np.int64)
values = np.asarray(values, dtype=dtype)
shape = np.asarray([len(sequences), indices.max(0)[1] + 1], dtype=np.int64)
# return tf.SparseTensor(indices=indices, values=values, shape=shape)
return indices, values, shape
# 将音频数据转为时间序列(列)和MFCC(行)的矩阵,将对应的译文转成字向量
def get_audio_and_transcriptch(txt_files, wav_files, n_input, n_context, word_num_map, txt_labels=None):
audio = []
audio_len = []
transcript = []
transcript_len = []
if txt_files != None:
txt_labels = txt_files
for txt_obj, wav_file in zip(txt_labels, wav_files):
# load audio and convert to features
audio_data = audiofile_to_input_vector(wav_file, n_input, n_context)
audio_data = audio_data.astype('float32')
# print(word_num_map)
audio.append(audio_data)
audio_len.append(np.int32(len(audio_data)))
# load text transcription and convert to numerical array
target = []
if txt_files != None: # txt_obj是文件
target = get_ch_lable_v(txt_obj, word_num_map)
else:
target = get_ch_lable_v(None, word_num_map, txt_obj) # txt_obj是labels
# target = text_to_char_array(target)
transcript.append(target)
transcript_len.append(len(target))
audio = np.asarray(audio)
audio_len = np.asarray(audio_len)
transcript = np.asarray(transcript)
transcript_len = np.asarray(transcript_len)
return audio, audio_len, transcript, transcript_len
# 将字符转成向量,其实就是根据字找到字在word_num_map中所应对的下标
def get_ch_lable_v(txt_file, word_num_map, txt_label=None):
words_size = len(word_num_map)
to_num = lambda word: word_num_map.get(word, words_size)
if txt_file != None:
txt_label = get_ch_lable(txt_file)
# print(txt_label)
labels_vector = list(map(to_num, txt_label))
# print(labels_vector)
return labels_vector
def get_ch_lable(txt_file):
labels = ""
with open(txt_file, 'rb') as f:
for label in f:
# labels =label.decode('utf-8')
labels = labels + label.decode('gb2312')
# labels.append(label.decode('gb2312'))
return labels
# 将音频信息转成MFCC特征
# 参数说明---audio_filename:音频文件 numcep:梅尔倒谱系数个数
# numcontext:对于每个时间段,要包含的上下文样本个数
def audiofile_to_input_vector(audio_filename, numcep, numcontext):
# 加载音频文件
fs, audio = wav.read(audio_filename)
# 获取MFCC系数
orig_inputs = mfcc(audio, samplerate=fs, numcep=numcep)
# 打印MFCC系数的形状,得到比如(955, 26)的形状
# 955表示时间序列,26表示每个序列的MFCC的特征值为26个
# 这个形状因文件而异,不同文件可能有不同长度的时间序列,但是,每个序列的特征值数量都是一样的
# print(np.shape(orig_inputs))
# 因为我们使用双向循环神经网络来训练,它的输出包含正、反向的结
# 果,相当于每一个时间序列都扩大了一倍,所以
# 为了保证总时序不变,使用orig_inputs =
# orig_inputs[::2]对orig_inputs每隔一行进行一次
# 取样。这样被忽略的那个序列可以用后文中反向
# RNN生成的输出来代替,维持了总的序列长度。
orig_inputs = orig_inputs[::2] # (478, 26)
# print(np.shape(orig_inputs))
# 因为我们讲解和实际使用的numcontext=9,所以下面的备注我都以numcontext=9来讲解
# 这里装的就是我们要返回的数据,因为同时要考虑前9个和后9个时间序列,
# 所以每个时间序列组合了19*26=494个MFCC特征数
train_inputs = np.array([], np.float32)
train_inputs.resize((orig_inputs.shape[0], numcep + 2 * numcep * numcontext))
# print(np.shape(train_inputs))#)(478, 494)
# Prepare pre-fix post fix context
empty_mfcc = np.array([])
empty_mfcc.resize((numcep))
# Prepare train_inputs with past and future contexts
# time_slices保存的是时间切片,也就是有多少个时间序列
time_slices = range(train_inputs.shape[0])
# context_past_min和context_future_max用来计算哪些序列需要补零
context
评论2
最新资源