| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- # -*- encoding: utf-8 -*-
- from pathlib import Path
- from typing import Any, Dict, Iterable, List, NamedTuple, Set, Tuple, Union
- import numpy as np
- from typeguard import check_argument_types
- import kaldi_native_fbank as knf
- root_dir = Path(__file__).resolve().parent
- logger_initialized = {}
- class WavFrontend():
- """Conventional frontend structure for ASR.
- """
- def __init__(
- self,
- cmvn_file: str = None,
- fs: int = 16000,
- window: str = 'hamming',
- n_mels: int = 80,
- frame_length: int = 25,
- frame_shift: int = 10,
- filter_length_min: int = -1,
- filter_length_max: float = -1,
- lfr_m: int = 1,
- lfr_n: int = 1,
- dither: float = 1.0
- ) -> None:
- check_argument_types()
- opts = knf.FbankOptions()
- opts.frame_opts.samp_freq = fs
- opts.frame_opts.dither = dither
- opts.frame_opts.window_type = window
- opts.frame_opts.frame_shift_ms = float(frame_shift)
- opts.frame_opts.frame_length_ms = float(frame_length)
- opts.mel_opts.num_bins = n_mels
- opts.energy_floor = 0
- opts.frame_opts.snip_edges = True
- opts.mel_opts.debug_mel = False
- self.opts = opts
- self.filter_length_min = filter_length_min
- self.filter_length_max = filter_length_max
- self.lfr_m = lfr_m
- self.lfr_n = lfr_n
- self.cmvn_file = cmvn_file
- if self.cmvn_file:
- self.cmvn = self.load_cmvn()
- def fbank(self,
- waveform: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
- waveform = waveform * (1 << 15)
- fbank_fn = knf.OnlineFbank(self.opts)
- fbank_fn.accept_waveform(self.opts.frame_opts.samp_freq, waveform.tolist())
- frames = fbank_fn.num_frames_ready
- mat = np.empty([frames, self.opts.mel_opts.num_bins])
- for i in range(frames):
- mat[i, :] = fbank_fn.get_frame(i)
- feat = mat.astype(np.float32)
- feat_len = np.array(mat.shape[0]).astype(np.int32)
- return feat, feat_len
- def lfr_cmvn(self, feat: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
- if self.lfr_m != 1 or self.lfr_n != 1:
- feat = self.apply_lfr(feat, self.lfr_m, self.lfr_n)
- if self.cmvn_file:
- feat = self.apply_cmvn(feat)
- feat_len = np.array(feat.shape[0]).astype(np.int32)
- return feat, feat_len
- @staticmethod
- def apply_lfr(inputs: np.ndarray, lfr_m: int, lfr_n: int) -> np.ndarray:
- LFR_inputs = []
- T = inputs.shape[0]
- T_lfr = int(np.ceil(T / lfr_n))
- left_padding = np.tile(inputs[0], ((lfr_m - 1) // 2, 1))
- inputs = np.vstack((left_padding, inputs))
- T = T + (lfr_m - 1) // 2
- for i in range(T_lfr):
- if lfr_m <= T - i * lfr_n:
- LFR_inputs.append(
- (inputs[i * lfr_n:i * lfr_n + lfr_m]).reshape(1, -1))
- else:
- # process last LFR frame
- num_padding = lfr_m - (T - i * lfr_n)
- frame = inputs[i * lfr_n:].reshape(-1)
- for _ in range(num_padding):
- frame = np.hstack((frame, inputs[-1]))
- LFR_inputs.append(frame)
- LFR_outputs = np.vstack(LFR_inputs).astype(np.float32)
- return LFR_outputs
- def apply_cmvn(self, inputs: np.ndarray) -> np.ndarray:
- """
- Apply CMVN with mvn data
- """
- frame, dim = inputs.shape
- means = np.tile(self.cmvn[0:1, :dim], (frame, 1))
- vars = np.tile(self.cmvn[1:2, :dim], (frame, 1))
- inputs = (inputs + means) * vars
- return inputs
- def load_cmvn(self,) -> np.ndarray:
- with open(self.cmvn_file, 'r', encoding='utf-8') as f:
- lines = f.readlines()
- means_list = []
- vars_list = []
- for i in range(len(lines)):
- line_item = lines[i].split()
- if line_item[0] == '<AddShift>':
- line_item = lines[i + 1].split()
- if line_item[0] == '<LearnRateCoef>':
- add_shift_line = line_item[3:(len(line_item) - 1)]
- means_list = list(add_shift_line)
- continue
- elif line_item[0] == '<Rescale>':
- line_item = lines[i + 1].split()
- if line_item[0] == '<LearnRateCoef>':
- rescale_line = line_item[3:(len(line_item) - 1)]
- vars_list = list(rescale_line)
- continue
- means = np.array(means_list).astype(np.float64)
- vars = np.array(vars_list).astype(np.float64)
- cmvn = np.array([means, vars])
- return cmvn
- def load_bytes(input):
- middle_data = np.frombuffer(input, dtype=np.int16)
- middle_data = np.asarray(middle_data)
- if middle_data.dtype.kind not in 'iu':
- raise TypeError("'middle_data' must be an array of integers")
- dtype = np.dtype('float32')
- if dtype.kind != 'f':
- raise TypeError("'dtype' must be a floating point type")
- i = np.iinfo(middle_data.dtype)
- abs_max = 2 ** (i.bits - 1)
- offset = i.min + abs_max
- array = np.frombuffer((middle_data.astype(dtype) - offset) / abs_max, dtype=np.float32)
- return array
|