inference.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. import json
  2. import time
  3. import torch
  4. import hydra
  5. import random
  6. import string
  7. import logging
  8. import os.path
  9. from tqdm import tqdm
  10. from omegaconf import DictConfig, OmegaConf, ListConfig
  11. from funasr.register import tables
  12. from funasr.utils.load_utils import load_bytes
  13. from funasr.download.file import download_from_url
  14. from funasr.download.download_from_hub import download_model
  15. from funasr.utils.vad_utils import slice_padding_audio_samples
  16. from funasr.train_utils.set_all_random_seed import set_all_random_seed
  17. from funasr.train_utils.load_pretrained_model import load_pretrained_model
  18. from funasr.utils.load_utils import load_audio_text_image_video, extract_fbank
  19. from funasr.utils.timestamp_tools import timestamp_sentence
  20. from funasr.models.campplus.utils import sv_chunk, postprocess, distribute_spk
  21. from funasr.models.campplus.cluster_backend import ClusterBackend
  22. def prepare_data_iterator(data_in, input_len=None, data_type=None, key=None):
  23. """
  24. :param input:
  25. :param input_len:
  26. :param data_type:
  27. :param frontend:
  28. :return:
  29. """
  30. data_list = []
  31. key_list = []
  32. filelist = [".scp", ".txt", ".json", ".jsonl"]
  33. chars = string.ascii_letters + string.digits
  34. if isinstance(data_in, str) and data_in.startswith('http'): # url
  35. data_in = download_from_url(data_in)
  36. if isinstance(data_in, str) and os.path.exists(data_in): # wav_path; filelist: wav.scp, file.jsonl;text.txt;
  37. _, file_extension = os.path.splitext(data_in)
  38. file_extension = file_extension.lower()
  39. if file_extension in filelist: #filelist: wav.scp, file.jsonl;text.txt;
  40. with open(data_in, encoding='utf-8') as fin:
  41. for line in fin:
  42. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  43. if data_in.endswith(".jsonl"): #file.jsonl: json.dumps({"source": data})
  44. lines = json.loads(line.strip())
  45. data = lines["source"]
  46. key = data["key"] if "key" in data else key
  47. else: # filelist, wav.scp, text.txt: id \t data or data
  48. lines = line.strip().split(maxsplit=1)
  49. data = lines[1] if len(lines)>1 else lines[0]
  50. key = lines[0] if len(lines)>1 else key
  51. data_list.append(data)
  52. key_list.append(key)
  53. else:
  54. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  55. data_list = [data_in]
  56. key_list = [key]
  57. elif isinstance(data_in, (list, tuple)):
  58. if data_type is not None and isinstance(data_type, (list, tuple)): # mutiple inputs
  59. data_list_tmp = []
  60. for data_in_i, data_type_i in zip(data_in, data_type):
  61. key_list, data_list_i = prepare_data_iterator(data_in=data_in_i, data_type=data_type_i)
  62. data_list_tmp.append(data_list_i)
  63. data_list = []
  64. for item in zip(*data_list_tmp):
  65. data_list.append(item)
  66. else:
  67. # [audio sample point, fbank, text]
  68. data_list = data_in
  69. key_list = ["rand_key_" + ''.join(random.choice(chars) for _ in range(13)) for _ in range(len(data_in))]
  70. else: # raw text; audio sample point, fbank; bytes
  71. if isinstance(data_in, bytes): # audio bytes
  72. data_in = load_bytes(data_in)
  73. if key is None:
  74. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  75. data_list = [data_in]
  76. key_list = [key]
  77. return key_list, data_list
  78. @hydra.main(config_name=None, version_base=None)
  79. def main_hydra(cfg: DictConfig):
  80. def to_plain_list(cfg_item):
  81. if isinstance(cfg_item, ListConfig):
  82. return OmegaConf.to_container(cfg_item, resolve=True)
  83. elif isinstance(cfg_item, DictConfig):
  84. return {k: to_plain_list(v) for k, v in cfg_item.items()}
  85. else:
  86. return cfg_item
  87. kwargs = to_plain_list(cfg)
  88. log_level = getattr(logging, kwargs.get("log_level", "INFO").upper())
  89. logging.basicConfig(level=log_level)
  90. if kwargs.get("debug", False):
  91. import pdb; pdb.set_trace()
  92. model = AutoModel(**kwargs)
  93. res = model(input=kwargs["input"])
  94. print(res)
  95. class AutoModel:
  96. def __init__(self, **kwargs):
  97. tables.print()
  98. model, kwargs = self.build_model(**kwargs)
  99. # if vad_model is not None, build vad model else None
  100. vad_model = kwargs.get("vad_model", None)
  101. vad_kwargs = kwargs.get("vad_model_revision", None)
  102. if vad_model is not None:
  103. logging.info("Building VAD model.")
  104. vad_kwargs = {"model": vad_model, "model_revision": vad_kwargs}
  105. vad_model, vad_kwargs = self.build_model(**vad_kwargs)
  106. # if punc_model is not None, build punc model else None
  107. punc_model = kwargs.get("punc_model", None)
  108. punc_kwargs = kwargs.get("punc_model_revision", None)
  109. if punc_model is not None:
  110. logging.info("Building punc model.")
  111. punc_kwargs = {"model": punc_model, "model_revision": punc_kwargs}
  112. punc_model, punc_kwargs = self.build_model(**punc_kwargs)
  113. # if spk_model is not None, build spk model else None
  114. spk_model = kwargs.get("spk_model", None)
  115. spk_kwargs = kwargs.get("spk_model_revision", None)
  116. if spk_model is not None:
  117. logging.info("Building SPK model.")
  118. spk_kwargs = {"model": spk_model, "model_revision": spk_kwargs}
  119. spk_model, spk_kwargs = self.build_model(**spk_kwargs)
  120. self.cb_model = ClusterBackend()
  121. spk_mode = kwargs.get("spk_mode", 'punc_segment')
  122. if spk_mode not in ["default", "vad_segment", "punc_segment"]:
  123. logging.error("spk_mode should be one of default, vad_segment and punc_segment.")
  124. self.spk_mode = spk_mode
  125. self.preset_spk_num = kwargs.get("preset_spk_num", None)
  126. if self.preset_spk_num:
  127. logging.warning("Using preset speaker number: {}".format(self.preset_spk_num))
  128. logging.warning("Many to print when using speaker model...")
  129. self.kwargs = kwargs
  130. self.model = model
  131. self.vad_model = vad_model
  132. self.vad_kwargs = vad_kwargs
  133. self.punc_model = punc_model
  134. self.punc_kwargs = punc_kwargs
  135. self.spk_model = spk_model
  136. self.spk_kwargs = spk_kwargs
  137. self.model_path = kwargs["model_path"]
  138. def build_model(self, **kwargs):
  139. assert "model" in kwargs
  140. if "model_conf" not in kwargs:
  141. logging.info("download models from model hub: {}".format(kwargs.get("model_hub", "ms")))
  142. kwargs = download_model(**kwargs)
  143. set_all_random_seed(kwargs.get("seed", 0))
  144. device = kwargs.get("device", "cuda")
  145. if not torch.cuda.is_available() or kwargs.get("ngpu", 0):
  146. device = "cpu"
  147. # kwargs["batch_size"] = 1
  148. kwargs["device"] = device
  149. if kwargs.get("ncpu", None):
  150. torch.set_num_threads(kwargs.get("ncpu"))
  151. # build tokenizer
  152. tokenizer = kwargs.get("tokenizer", None)
  153. if tokenizer is not None:
  154. tokenizer_class = tables.tokenizer_classes.get(tokenizer)
  155. tokenizer = tokenizer_class(**kwargs["tokenizer_conf"])
  156. kwargs["tokenizer"] = tokenizer
  157. kwargs["token_list"] = tokenizer.token_list
  158. vocab_size = len(tokenizer.token_list)
  159. else:
  160. vocab_size = -1
  161. # build frontend
  162. frontend = kwargs.get("frontend", None)
  163. if frontend is not None:
  164. frontend_class = tables.frontend_classes.get(frontend)
  165. frontend = frontend_class(**kwargs["frontend_conf"])
  166. kwargs["frontend"] = frontend
  167. kwargs["input_size"] = frontend.output_size()
  168. # build model
  169. model_class = tables.model_classes.get(kwargs["model"])
  170. model = model_class(**kwargs, **kwargs["model_conf"], vocab_size=vocab_size)
  171. model.eval()
  172. model.to(device)
  173. # init_param
  174. init_param = kwargs.get("init_param", None)
  175. if init_param is not None:
  176. logging.info(f"Loading pretrained params from {init_param}")
  177. load_pretrained_model(
  178. model=model,
  179. init_param=init_param,
  180. ignore_init_mismatch=kwargs.get("ignore_init_mismatch", False),
  181. oss_bucket=kwargs.get("oss_bucket", None),
  182. )
  183. return model, kwargs
  184. def __call__(self, input, input_len=None, **cfg):
  185. if self.vad_model is None:
  186. return self.generate(input, input_len=input_len, **cfg)
  187. else:
  188. return self.generate_with_vad(input, input_len=input_len, **cfg)
  189. def generate(self, input, input_len=None, model=None, kwargs=None, key=None, **cfg):
  190. kwargs = self.kwargs if kwargs is None else kwargs
  191. kwargs.update(cfg)
  192. model = self.model if model is None else model
  193. batch_size = kwargs.get("batch_size", 1)
  194. # if kwargs.get("device", "cpu") == "cpu":
  195. # batch_size = 1
  196. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None), key=key)
  197. speed_stats = {}
  198. asr_result_list = []
  199. num_samples = len(data_list)
  200. pbar = tqdm(colour="blue", total=num_samples+1, dynamic_ncols=True)
  201. time_speech_total = 0.0
  202. time_escape_total = 0.0
  203. for beg_idx in range(0, num_samples, batch_size):
  204. end_idx = min(num_samples, beg_idx + batch_size)
  205. data_batch = data_list[beg_idx:end_idx]
  206. key_batch = key_list[beg_idx:end_idx]
  207. batch = {"data_in": data_batch, "key": key_batch}
  208. if (end_idx - beg_idx) == 1 and isinstance(data_batch[0], torch.Tensor): # fbank
  209. batch["data_in"] = data_batch[0]
  210. batch["data_lengths"] = input_len
  211. time1 = time.perf_counter()
  212. with torch.no_grad():
  213. results, meta_data = model.generate(**batch, **kwargs)
  214. time2 = time.perf_counter()
  215. asr_result_list.extend(results)
  216. pbar.update(1)
  217. # batch_data_time = time_per_frame_s * data_batch_i["speech_lengths"].sum().item()
  218. batch_data_time = meta_data.get("batch_data_time", -1)
  219. time_escape = time2 - time1
  220. speed_stats["load_data"] = meta_data.get("load_data", 0.0)
  221. speed_stats["extract_feat"] = meta_data.get("extract_feat", 0.0)
  222. speed_stats["forward"] = f"{time_escape:0.3f}"
  223. speed_stats["batch_size"] = f"{len(results)}"
  224. speed_stats["rtf"] = f"{(time_escape) / batch_data_time:0.3f}"
  225. description = (
  226. f"{speed_stats}, "
  227. )
  228. pbar.set_description(description)
  229. time_speech_total += batch_data_time
  230. time_escape_total += time_escape
  231. pbar.update(1)
  232. pbar.set_description(f"rtf_avg: {time_escape_total/time_speech_total:0.3f}")
  233. torch.cuda.empty_cache()
  234. return asr_result_list
  235. def generate_with_vad(self, input, input_len=None, **cfg):
  236. # step.1: compute the vad model
  237. model = self.vad_model
  238. kwargs = self.vad_kwargs
  239. kwargs.update(cfg)
  240. beg_vad = time.time()
  241. res = self.generate(input, input_len=input_len, model=model, kwargs=kwargs, **cfg)
  242. vad_res = res
  243. end_vad = time.time()
  244. print(f"time cost vad: {end_vad - beg_vad:0.3f}")
  245. # step.2 compute asr model
  246. model = self.model
  247. kwargs = self.kwargs
  248. kwargs.update(cfg)
  249. batch_size = int(kwargs.get("batch_size_s", 300))*1000
  250. batch_size_threshold_ms = int(kwargs.get("batch_size_threshold_s", 60))*1000
  251. kwargs["batch_size"] = batch_size
  252. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None))
  253. results_ret_list = []
  254. time_speech_total_all_samples = 0.0
  255. beg_total = time.time()
  256. pbar_total = tqdm(colour="red", total=len(res) + 1, dynamic_ncols=True)
  257. for i in range(len(res)):
  258. key = res[i]["key"]
  259. vadsegments = res[i]["value"]
  260. input_i = data_list[i]
  261. speech = load_audio_text_image_video(input_i, fs=kwargs["frontend"].fs, audio_fs=kwargs.get("fs", 16000))
  262. speech_lengths = len(speech)
  263. n = len(vadsegments)
  264. data_with_index = [(vadsegments[i], i) for i in range(n)]
  265. sorted_data = sorted(data_with_index, key=lambda x: x[0][1] - x[0][0])
  266. results_sorted = []
  267. if not len(sorted_data):
  268. logging.info("decoding, utt: {}, empty speech".format(key))
  269. continue
  270. # if kwargs["device"] == "cpu":
  271. # batch_size = 0
  272. if len(sorted_data) > 0 and len(sorted_data[0]) > 0:
  273. batch_size = max(batch_size, sorted_data[0][0][1] - sorted_data[0][0][0])
  274. batch_size_ms_cum = 0
  275. beg_idx = 0
  276. beg_asr_total = time.time()
  277. time_speech_total_per_sample = speech_lengths/16000
  278. time_speech_total_all_samples += time_speech_total_per_sample
  279. for j, _ in enumerate(range(0, n)):
  280. batch_size_ms_cum += (sorted_data[j][0][1] - sorted_data[j][0][0])
  281. if j < n - 1 and (
  282. batch_size_ms_cum + sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size and (
  283. sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size_threshold_ms:
  284. continue
  285. batch_size_ms_cum = 0
  286. end_idx = j + 1
  287. speech_j, speech_lengths_j = slice_padding_audio_samples(speech, speech_lengths, sorted_data[beg_idx:end_idx])
  288. results = self.generate(speech_j, input_len=None, model=model, kwargs=kwargs, **cfg)
  289. if self.spk_model is not None:
  290. all_segments = []
  291. # compose vad segments: [[start_time_sec, end_time_sec, speech], [...]]
  292. for _b in range(len(speech_j)):
  293. vad_segments = [[sorted_data[beg_idx:end_idx][_b][0][0]/1000.0, \
  294. sorted_data[beg_idx:end_idx][_b][0][1]/1000.0, \
  295. speech_j[_b]]]
  296. segments = sv_chunk(vad_segments)
  297. all_segments.extend(segments)
  298. speech_b = [i[2] for i in segments]
  299. spk_res = self.generate(speech_b, input_len=None, model=self.spk_model, kwargs=kwargs, **cfg)
  300. results[_b]['spk_embedding'] = spk_res[0]['spk_embedding']
  301. beg_idx = end_idx
  302. if len(results) < 1:
  303. continue
  304. results_sorted.extend(results)
  305. pbar_total.update(1)
  306. end_asr_total = time.time()
  307. time_escape_total_per_sample = end_asr_total - beg_asr_total
  308. pbar_total.set_description(f"rtf_avg_per_sample: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  309. f"time_speech_total_per_sample: {time_speech_total_per_sample: 0.3f}, "
  310. f"time_escape_total_per_sample: {time_escape_total_per_sample:0.3f}")
  311. restored_data = [0] * n
  312. for j in range(n):
  313. index = sorted_data[j][1]
  314. restored_data[index] = results_sorted[j]
  315. result = {}
  316. # results combine for texts, timestamps, speaker embeddings and others
  317. # TODO: rewrite for clean code
  318. for j in range(n):
  319. for k, v in restored_data[j].items():
  320. if k.startswith("timestamp"):
  321. if k not in result:
  322. result[k] = []
  323. for t in restored_data[j][k]:
  324. t[0] += vadsegments[j][0]
  325. t[1] += vadsegments[j][0]
  326. result[k].extend(restored_data[j][k])
  327. elif k == 'spk_embedding':
  328. if k not in result:
  329. result[k] = restored_data[j][k]
  330. else:
  331. result[k] = torch.cat([result[k], restored_data[j][k]], dim=0)
  332. elif k == 'text':
  333. if k not in result:
  334. result[k] = restored_data[j][k]
  335. else:
  336. result[k] += " " + restored_data[j][k]
  337. else:
  338. if k not in result:
  339. result[k] = restored_data[j][k]
  340. else:
  341. result[k] += restored_data[j][k]
  342. # step.3 compute punc model
  343. if self.punc_model is not None:
  344. self.punc_kwargs.update(cfg)
  345. punc_res = self.generate(result["text"], model=self.punc_model, kwargs=self.punc_kwargs, **cfg)
  346. result["text_with_punc"] = punc_res[0]["text"]
  347. # speaker embedding cluster after resorted
  348. if self.spk_model is not None:
  349. all_segments = sorted(all_segments, key=lambda x: x[0])
  350. spk_embedding = result['spk_embedding']
  351. labels = self.cb_model(spk_embedding, oracle_num=self.preset_spk_num)
  352. del result['spk_embedding']
  353. sv_output = postprocess(all_segments, None, labels, spk_embedding.cpu())
  354. if self.spk_mode == 'vad_segment':
  355. sentence_list = []
  356. for res, vadsegment in zip(restored_data, vadsegments):
  357. sentence_list.append({"start": vadsegment[0],\
  358. "end": vadsegment[1],
  359. "sentence": res['text'],
  360. "timestamp": res['timestamp']})
  361. else: # punc_segment
  362. sentence_list = timestamp_sentence(punc_res[0]['punc_array'], \
  363. result['timestamp'], \
  364. result['text'])
  365. distribute_spk(sentence_list, sv_output)
  366. result['sentence_info'] = sentence_list
  367. result["key"] = key
  368. results_ret_list.append(result)
  369. pbar_total.update(1)
  370. pbar_total.update(1)
  371. end_total = time.time()
  372. time_escape_total_all_samples = end_total - beg_total
  373. pbar_total.set_description(f"rtf_avg_all_samples: {time_escape_total_all_samples / time_speech_total_all_samples:0.3f}, "
  374. f"time_speech_total_all_samples: {time_speech_total_all_samples: 0.3f}, "
  375. f"time_escape_total_all_samples: {time_escape_total_all_samples:0.3f}")
  376. return results_ret_list
  377. class AutoFrontend:
  378. def __init__(self, **kwargs):
  379. assert "model" in kwargs
  380. if "model_conf" not in kwargs:
  381. logging.info("download models from model hub: {}".format(kwargs.get("model_hub", "ms")))
  382. kwargs = download_model(**kwargs)
  383. # build frontend
  384. frontend = kwargs.get("frontend", None)
  385. if frontend is not None:
  386. frontend_class = tables.frontend_classes.get(frontend)
  387. frontend = frontend_class(**kwargs["frontend_conf"])
  388. self.frontend = frontend
  389. if "frontend" in kwargs:
  390. del kwargs["frontend"]
  391. self.kwargs = kwargs
  392. def __call__(self, input, input_len=None, kwargs=None, **cfg):
  393. kwargs = self.kwargs if kwargs is None else kwargs
  394. kwargs.update(cfg)
  395. key_list, data_list = prepare_data_iterator(input, input_len=input_len)
  396. batch_size = kwargs.get("batch_size", 1)
  397. device = kwargs.get("device", "cpu")
  398. if device == "cpu":
  399. batch_size = 1
  400. meta_data = {}
  401. result_list = []
  402. num_samples = len(data_list)
  403. pbar = tqdm(colour="blue", total=num_samples + 1, dynamic_ncols=True)
  404. time0 = time.perf_counter()
  405. for beg_idx in range(0, num_samples, batch_size):
  406. end_idx = min(num_samples, beg_idx + batch_size)
  407. data_batch = data_list[beg_idx:end_idx]
  408. key_batch = key_list[beg_idx:end_idx]
  409. # extract fbank feats
  410. time1 = time.perf_counter()
  411. audio_sample_list = load_audio_text_image_video(data_batch, fs=self.frontend.fs, audio_fs=kwargs.get("fs", 16000))
  412. time2 = time.perf_counter()
  413. meta_data["load_data"] = f"{time2 - time1:0.3f}"
  414. speech, speech_lengths = extract_fbank(audio_sample_list, data_type=kwargs.get("data_type", "sound"),
  415. frontend=self.frontend, **kwargs)
  416. time3 = time.perf_counter()
  417. meta_data["extract_feat"] = f"{time3 - time2:0.3f}"
  418. meta_data["batch_data_time"] = speech_lengths.sum().item() * self.frontend.frame_shift * self.frontend.lfr_n / 1000
  419. speech.to(device=device), speech_lengths.to(device=device)
  420. batch = {"input": speech, "input_len": speech_lengths, "key": key_batch}
  421. result_list.append(batch)
  422. pbar.update(1)
  423. description = (
  424. f"{meta_data}, "
  425. )
  426. pbar.set_description(description)
  427. time_end = time.perf_counter()
  428. pbar.set_description(f"time escaped total: {time_end - time0:0.3f}")
  429. return result_list
  430. if __name__ == '__main__':
  431. main_hydra()