auto_model.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. import json
  2. import time
  3. import torch
  4. import hydra
  5. import random
  6. import string
  7. import logging
  8. import os.path
  9. import numpy as np
  10. from tqdm import tqdm
  11. from omegaconf import DictConfig, OmegaConf, ListConfig
  12. from funasr.register import tables
  13. from funasr.utils.load_utils import load_bytes
  14. from funasr.download.file import download_from_url
  15. from funasr.download.download_from_hub import download_model
  16. from funasr.utils.vad_utils import slice_padding_audio_samples
  17. from funasr.train_utils.set_all_random_seed import set_all_random_seed
  18. from funasr.train_utils.load_pretrained_model import load_pretrained_model
  19. from funasr.utils.load_utils import load_audio_text_image_video, extract_fbank
  20. from funasr.utils.timestamp_tools import timestamp_sentence
  21. from funasr.models.campplus.utils import sv_chunk, postprocess, distribute_spk
  22. from funasr.models.campplus.cluster_backend import ClusterBackend
  23. def prepare_data_iterator(data_in, input_len=None, data_type=None, key=None):
  24. """
  25. :param input:
  26. :param input_len:
  27. :param data_type:
  28. :param frontend:
  29. :return:
  30. """
  31. data_list = []
  32. key_list = []
  33. filelist = [".scp", ".txt", ".json", ".jsonl"]
  34. chars = string.ascii_letters + string.digits
  35. if isinstance(data_in, str) and data_in.startswith('http'): # url
  36. data_in = download_from_url(data_in)
  37. if isinstance(data_in, str) and os.path.exists(data_in): # wav_path; filelist: wav.scp, file.jsonl;text.txt;
  38. _, file_extension = os.path.splitext(data_in)
  39. file_extension = file_extension.lower()
  40. if file_extension in filelist: #filelist: wav.scp, file.jsonl;text.txt;
  41. with open(data_in, encoding='utf-8') as fin:
  42. for line in fin:
  43. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  44. if data_in.endswith(".jsonl"): #file.jsonl: json.dumps({"source": data})
  45. lines = json.loads(line.strip())
  46. data = lines["source"]
  47. key = data["key"] if "key" in data else key
  48. else: # filelist, wav.scp, text.txt: id \t data or data
  49. lines = line.strip().split(maxsplit=1)
  50. data = lines[1] if len(lines)>1 else lines[0]
  51. key = lines[0] if len(lines)>1 else key
  52. data_list.append(data)
  53. key_list.append(key)
  54. else:
  55. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  56. data_list = [data_in]
  57. key_list = [key]
  58. elif isinstance(data_in, (list, tuple)):
  59. if data_type is not None and isinstance(data_type, (list, tuple)): # mutiple inputs
  60. data_list_tmp = []
  61. for data_in_i, data_type_i in zip(data_in, data_type):
  62. key_list, data_list_i = prepare_data_iterator(data_in=data_in_i, data_type=data_type_i)
  63. data_list_tmp.append(data_list_i)
  64. data_list = []
  65. for item in zip(*data_list_tmp):
  66. data_list.append(item)
  67. else:
  68. # [audio sample point, fbank, text]
  69. data_list = data_in
  70. key_list = ["rand_key_" + ''.join(random.choice(chars) for _ in range(13)) for _ in range(len(data_in))]
  71. else: # raw text; audio sample point, fbank; bytes
  72. if isinstance(data_in, bytes): # audio bytes
  73. data_in = load_bytes(data_in)
  74. if key is None:
  75. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  76. data_list = [data_in]
  77. key_list = [key]
  78. return key_list, data_list
  79. class AutoModel:
  80. def __init__(self, **kwargs):
  81. if not kwargs.get("disable_log", False):
  82. tables.print()
  83. model, kwargs = self.build_model(**kwargs)
  84. # if vad_model is not None, build vad model else None
  85. vad_model = kwargs.get("vad_model", None)
  86. vad_kwargs = kwargs.get("vad_model_revision", None)
  87. if vad_model is not None:
  88. logging.info("Building VAD model.")
  89. vad_kwargs = {"model": vad_model, "model_revision": vad_kwargs, "device": kwargs["device"]}
  90. vad_model, vad_kwargs = self.build_model(**vad_kwargs)
  91. # if punc_model is not None, build punc model else None
  92. punc_model = kwargs.get("punc_model", None)
  93. punc_kwargs = kwargs.get("punc_model_revision", None)
  94. if punc_model is not None:
  95. logging.info("Building punc model.")
  96. punc_kwargs = {"model": punc_model, "model_revision": punc_kwargs, "device": kwargs["device"]}
  97. punc_model, punc_kwargs = self.build_model(**punc_kwargs)
  98. # if spk_model is not None, build spk model else None
  99. spk_model = kwargs.get("spk_model", None)
  100. spk_kwargs = kwargs.get("spk_model_revision", None)
  101. if spk_model is not None:
  102. logging.info("Building SPK model.")
  103. spk_kwargs = {"model": spk_model, "model_revision": spk_kwargs, "device": kwargs["device"]}
  104. spk_model, spk_kwargs = self.build_model(**spk_kwargs)
  105. self.cb_model = ClusterBackend().to(kwargs["device"])
  106. spk_mode = kwargs.get("spk_mode", 'punc_segment')
  107. if spk_mode not in ["default", "vad_segment", "punc_segment"]:
  108. logging.error("spk_mode should be one of default, vad_segment and punc_segment.")
  109. self.spk_mode = spk_mode
  110. self.kwargs = kwargs
  111. self.model = model
  112. self.vad_model = vad_model
  113. self.vad_kwargs = vad_kwargs
  114. self.punc_model = punc_model
  115. self.punc_kwargs = punc_kwargs
  116. self.spk_model = spk_model
  117. self.spk_kwargs = spk_kwargs
  118. self.model_path = kwargs.get("model_path")
  119. def build_model(self, **kwargs):
  120. assert "model" in kwargs
  121. if "model_conf" not in kwargs:
  122. logging.info("download models from model hub: {}".format(kwargs.get("model_hub", "ms")))
  123. kwargs = download_model(**kwargs)
  124. set_all_random_seed(kwargs.get("seed", 0))
  125. device = kwargs.get("device", "cuda")
  126. if not torch.cuda.is_available() or kwargs.get("ngpu", 1) == 0:
  127. device = "cpu"
  128. kwargs["batch_size"] = 1
  129. kwargs["device"] = device
  130. if kwargs.get("ncpu", None):
  131. torch.set_num_threads(kwargs.get("ncpu"))
  132. # build tokenizer
  133. tokenizer = kwargs.get("tokenizer", None)
  134. if tokenizer is not None:
  135. tokenizer_class = tables.tokenizer_classes.get(tokenizer)
  136. tokenizer = tokenizer_class(**kwargs["tokenizer_conf"])
  137. kwargs["tokenizer"] = tokenizer
  138. kwargs["token_list"] = tokenizer.token_list
  139. vocab_size = len(tokenizer.token_list)
  140. else:
  141. vocab_size = -1
  142. # build frontend
  143. frontend = kwargs.get("frontend", None)
  144. if frontend is not None:
  145. frontend_class = tables.frontend_classes.get(frontend)
  146. frontend = frontend_class(**kwargs["frontend_conf"])
  147. kwargs["frontend"] = frontend
  148. kwargs["input_size"] = frontend.output_size()
  149. # build model
  150. model_class = tables.model_classes.get(kwargs["model"])
  151. model = model_class(**kwargs, **kwargs["model_conf"], vocab_size=vocab_size)
  152. model.eval()
  153. model.to(device)
  154. # init_param
  155. init_param = kwargs.get("init_param", None)
  156. if init_param is not None:
  157. logging.info(f"Loading pretrained params from {init_param}")
  158. load_pretrained_model(
  159. model=model,
  160. path=init_param,
  161. ignore_init_mismatch=kwargs.get("ignore_init_mismatch", False),
  162. oss_bucket=kwargs.get("oss_bucket", None),
  163. scope_map=kwargs.get("scope_map", None),
  164. excludes=kwargs.get("excludes", None),
  165. )
  166. return model, kwargs
  167. def __call__(self, *args, **cfg):
  168. kwargs = self.kwargs
  169. kwargs.update(cfg)
  170. res = self.model(*args, kwargs)
  171. return res
  172. def generate(self, input, input_len=None, **cfg):
  173. if self.vad_model is None:
  174. return self.inference(input, input_len=input_len, **cfg)
  175. else:
  176. return self.inference_with_vad(input, input_len=input_len, **cfg)
  177. def inference(self, input, input_len=None, model=None, kwargs=None, key=None, **cfg):
  178. kwargs = self.kwargs if kwargs is None else kwargs
  179. kwargs.update(cfg)
  180. model = self.model if model is None else model
  181. batch_size = kwargs.get("batch_size", 1)
  182. # if kwargs.get("device", "cpu") == "cpu":
  183. # batch_size = 1
  184. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None), key=key)
  185. speed_stats = {}
  186. asr_result_list = []
  187. num_samples = len(data_list)
  188. disable_pbar = kwargs.get("disable_pbar", False)
  189. pbar = tqdm(colour="blue", total=num_samples, dynamic_ncols=True) if not disable_pbar else None
  190. time_speech_total = 0.0
  191. time_escape_total = 0.0
  192. for beg_idx in range(0, num_samples, batch_size):
  193. end_idx = min(num_samples, beg_idx + batch_size)
  194. data_batch = data_list[beg_idx:end_idx]
  195. key_batch = key_list[beg_idx:end_idx]
  196. batch = {"data_in": data_batch, "key": key_batch}
  197. if (end_idx - beg_idx) == 1 and kwargs.get("data_type", None) == "fbank": # fbank
  198. batch["data_in"] = data_batch[0]
  199. batch["data_lengths"] = input_len
  200. time1 = time.perf_counter()
  201. with torch.no_grad():
  202. results, meta_data = model.inference(**batch, **kwargs)
  203. time2 = time.perf_counter()
  204. asr_result_list.extend(results)
  205. # batch_data_time = time_per_frame_s * data_batch_i["speech_lengths"].sum().item()
  206. batch_data_time = meta_data.get("batch_data_time", -1)
  207. time_escape = time2 - time1
  208. speed_stats["load_data"] = meta_data.get("load_data", 0.0)
  209. speed_stats["extract_feat"] = meta_data.get("extract_feat", 0.0)
  210. speed_stats["forward"] = f"{time_escape:0.3f}"
  211. speed_stats["batch_size"] = f"{len(results)}"
  212. speed_stats["rtf"] = f"{(time_escape) / batch_data_time:0.3f}"
  213. description = (
  214. f"{speed_stats}, "
  215. )
  216. if pbar:
  217. pbar.update(1)
  218. pbar.set_description(description)
  219. time_speech_total += batch_data_time
  220. time_escape_total += time_escape
  221. if pbar:
  222. # pbar.update(1)
  223. pbar.set_description(f"rtf_avg: {time_escape_total/time_speech_total:0.3f}")
  224. torch.cuda.empty_cache()
  225. return asr_result_list
  226. def inference_with_vad(self, input, input_len=None, **cfg):
  227. # step.1: compute the vad model
  228. self.vad_kwargs.update(cfg)
  229. beg_vad = time.time()
  230. res = self.inference(input, input_len=input_len, model=self.vad_model, kwargs=self.vad_kwargs, **cfg)
  231. end_vad = time.time()
  232. print(f"time cost vad: {end_vad - beg_vad:0.3f}")
  233. # step.2 compute asr model
  234. model = self.model
  235. kwargs = self.kwargs
  236. kwargs.update(cfg)
  237. batch_size = int(kwargs.get("batch_size_s", 300))*1000
  238. batch_size_threshold_ms = int(kwargs.get("batch_size_threshold_s", 60))*1000
  239. kwargs["batch_size"] = batch_size
  240. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None))
  241. results_ret_list = []
  242. time_speech_total_all_samples = 1e-6
  243. beg_total = time.time()
  244. pbar_total = tqdm(colour="red", total=len(res), dynamic_ncols=True)
  245. for i in range(len(res)):
  246. key = res[i]["key"]
  247. vadsegments = res[i]["value"]
  248. input_i = data_list[i]
  249. speech = load_audio_text_image_video(input_i, fs=kwargs["frontend"].fs, audio_fs=kwargs.get("fs", 16000))
  250. speech_lengths = len(speech)
  251. n = len(vadsegments)
  252. data_with_index = [(vadsegments[i], i) for i in range(n)]
  253. sorted_data = sorted(data_with_index, key=lambda x: x[0][1] - x[0][0])
  254. results_sorted = []
  255. if not len(sorted_data):
  256. logging.info("decoding, utt: {}, empty speech".format(key))
  257. continue
  258. if len(sorted_data) > 0 and len(sorted_data[0]) > 0:
  259. batch_size = max(batch_size, sorted_data[0][0][1] - sorted_data[0][0][0])
  260. batch_size_ms_cum = 0
  261. beg_idx = 0
  262. beg_asr_total = time.time()
  263. time_speech_total_per_sample = speech_lengths/16000
  264. time_speech_total_all_samples += time_speech_total_per_sample
  265. # pbar_sample = tqdm(colour="blue", total=n, dynamic_ncols=True)
  266. all_segments = []
  267. for j, _ in enumerate(range(0, n)):
  268. # pbar_sample.update(1)
  269. batch_size_ms_cum += (sorted_data[j][0][1] - sorted_data[j][0][0])
  270. if j < n - 1 and (
  271. batch_size_ms_cum + sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size and (
  272. sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size_threshold_ms:
  273. continue
  274. batch_size_ms_cum = 0
  275. end_idx = j + 1
  276. speech_j, speech_lengths_j = slice_padding_audio_samples(speech, speech_lengths, sorted_data[beg_idx:end_idx])
  277. results = self.inference(speech_j, input_len=None, model=model, kwargs=kwargs, disable_pbar=True, **cfg)
  278. if self.spk_model is not None:
  279. # compose vad segments: [[start_time_sec, end_time_sec, speech], [...]]
  280. for _b in range(len(speech_j)):
  281. vad_segments = [[sorted_data[beg_idx:end_idx][_b][0][0]/1000.0,
  282. sorted_data[beg_idx:end_idx][_b][0][1]/1000.0,
  283. np.array(speech_j[_b])]]
  284. segments = sv_chunk(vad_segments)
  285. all_segments.extend(segments)
  286. speech_b = [i[2] for i in segments]
  287. spk_res = self.inference(speech_b, input_len=None, model=self.spk_model, kwargs=kwargs, disable_pbar=True, **cfg)
  288. results[_b]['spk_embedding'] = spk_res[0]['spk_embedding']
  289. beg_idx = end_idx
  290. if len(results) < 1:
  291. continue
  292. results_sorted.extend(results)
  293. # end_asr_total = time.time()
  294. # time_escape_total_per_sample = end_asr_total - beg_asr_total
  295. # pbar_sample.update(1)
  296. # pbar_sample.set_description(f"rtf_avg_per_sample: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  297. # f"time_speech_total_per_sample: {time_speech_total_per_sample: 0.3f}, "
  298. # f"time_escape_total_per_sample: {time_escape_total_per_sample:0.3f}")
  299. restored_data = [0] * n
  300. for j in range(n):
  301. index = sorted_data[j][1]
  302. restored_data[index] = results_sorted[j]
  303. result = {}
  304. # results combine for texts, timestamps, speaker embeddings and others
  305. # TODO: rewrite for clean code
  306. for j in range(n):
  307. for k, v in restored_data[j].items():
  308. if k.startswith("timestamp"):
  309. if k not in result:
  310. result[k] = []
  311. for t in restored_data[j][k]:
  312. t[0] += vadsegments[j][0]
  313. t[1] += vadsegments[j][0]
  314. result[k].extend(restored_data[j][k])
  315. elif k == 'spk_embedding':
  316. if k not in result:
  317. result[k] = restored_data[j][k]
  318. else:
  319. result[k] = torch.cat([result[k], restored_data[j][k]], dim=0)
  320. elif 'text' in k:
  321. if k not in result:
  322. result[k] = restored_data[j][k]
  323. else:
  324. result[k] += " " + restored_data[j][k]
  325. else:
  326. if k not in result:
  327. result[k] = restored_data[j][k]
  328. else:
  329. result[k] += restored_data[j][k]
  330. # step.3 compute punc model
  331. if self.punc_model is not None:
  332. self.punc_kwargs.update(cfg)
  333. punc_res = self.inference(result["text"], model=self.punc_model, kwargs=self.punc_kwargs, disable_pbar=True, **cfg)
  334. import copy; raw_text = copy.copy(result["text"])
  335. result["text"] = punc_res[0]["text"]
  336. # speaker embedding cluster after resorted
  337. if self.spk_model is not None and kwargs.get('return_spk_res', True):
  338. all_segments = sorted(all_segments, key=lambda x: x[0])
  339. spk_embedding = result['spk_embedding']
  340. labels = self.cb_model(spk_embedding.cpu(), oracle_num=kwargs.get('preset_spk_num', None))
  341. # del result['spk_embedding']
  342. sv_output = postprocess(all_segments, None, labels, spk_embedding.cpu())
  343. if self.spk_mode == 'vad_segment': # recover sentence_list
  344. sentence_list = []
  345. for res, vadsegment in zip(restored_data, vadsegments):
  346. sentence_list.append({"start": vadsegment[0],\
  347. "end": vadsegment[1],
  348. "sentence": res['raw_text'],
  349. "timestamp": res['timestamp']})
  350. elif self.spk_mode == 'punc_segment':
  351. sentence_list = timestamp_sentence(punc_res[0]['punc_array'], \
  352. result['timestamp'], \
  353. result['raw_text'])
  354. distribute_spk(sentence_list, sv_output)
  355. result['sentence_info'] = sentence_list
  356. elif kwargs.get("sentence_timestamp", False):
  357. sentence_list = timestamp_sentence(punc_res[0]['punc_array'], \
  358. result['timestamp'], \
  359. result['raw_text'])
  360. result['sentence_info'] = sentence_list
  361. del result['spk_embedding']
  362. result["key"] = key
  363. results_ret_list.append(result)
  364. end_asr_total = time.time()
  365. time_escape_total_per_sample = end_asr_total - beg_asr_total
  366. pbar_total.update(1)
  367. pbar_total.set_description(f"rtf_avg: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  368. f"time_speech: {time_speech_total_per_sample: 0.3f}, "
  369. f"time_escape: {time_escape_total_per_sample:0.3f}")
  370. # end_total = time.time()
  371. # time_escape_total_all_samples = end_total - beg_total
  372. # print(f"rtf_avg_all: {time_escape_total_all_samples / time_speech_total_all_samples:0.3f}, "
  373. # f"time_speech_all: {time_speech_total_all_samples: 0.3f}, "
  374. # f"time_escape_all: {time_escape_total_all_samples:0.3f}")
  375. return results_ret_list