auto_model.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450
  1. import json
  2. import time
  3. import copy
  4. import torch
  5. import random
  6. import string
  7. import logging
  8. import os.path
  9. import numpy as np
  10. from tqdm import tqdm
  11. from funasr.register import tables
  12. from funasr.utils.load_utils import load_bytes
  13. from funasr.download.file import download_from_url
  14. from funasr.download.download_from_hub import download_model
  15. from funasr.utils.vad_utils import slice_padding_audio_samples
  16. from funasr.train_utils.set_all_random_seed import set_all_random_seed
  17. from funasr.train_utils.load_pretrained_model import load_pretrained_model
  18. from funasr.utils.load_utils import load_audio_text_image_video
  19. from funasr.utils.timestamp_tools import timestamp_sentence
  20. from funasr.models.campplus.utils import sv_chunk, postprocess, distribute_spk
  21. try:
  22. from funasr.models.campplus.cluster_backend import ClusterBackend
  23. except:
  24. print("If you want to use the speaker diarization, please `pip install hdbscan`")
  25. import pdb
  26. def prepare_data_iterator(data_in, input_len=None, data_type=None, key=None):
  27. """
  28. :param input:
  29. :param input_len:
  30. :param data_type:
  31. :param frontend:
  32. :return:
  33. """
  34. data_list = []
  35. key_list = []
  36. filelist = [".scp", ".txt", ".json", ".jsonl"]
  37. chars = string.ascii_letters + string.digits
  38. if isinstance(data_in, str) and data_in.startswith('http'): # url
  39. data_in = download_from_url(data_in)
  40. pdb.set_trace()
  41. if isinstance(data_in, str) and os.path.exists(data_in): # wav_path; filelist: wav.scp, file.jsonl;text.txt;
  42. _, file_extension = os.path.splitext(data_in)
  43. file_extension = file_extension.lower()
  44. if file_extension in filelist: #filelist: wav.scp, file.jsonl;text.txt;
  45. with open(data_in, encoding='utf-8') as fin:
  46. for line in fin:
  47. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  48. if data_in.endswith(".jsonl"): #file.jsonl: json.dumps({"source": data})
  49. lines = json.loads(line.strip())
  50. data = lines["source"]
  51. key = data["key"] if "key" in data else key
  52. else: # filelist, wav.scp, text.txt: id \t data or data
  53. lines = line.strip().split(maxsplit=1)
  54. data = lines[1] if len(lines)>1 else lines[0]
  55. key = lines[0] if len(lines)>1 else key
  56. data_list.append(data)
  57. key_list.append(key)
  58. else:
  59. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  60. data_list = [data_in]
  61. key_list = [key]
  62. elif isinstance(data_in, (list, tuple)):
  63. if data_type is not None and isinstance(data_type, (list, tuple)): # mutiple inputs
  64. data_list_tmp = []
  65. for data_in_i, data_type_i in zip(data_in, data_type):
  66. key_list, data_list_i = prepare_data_iterator(data_in=data_in_i, data_type=data_type_i)
  67. data_list_tmp.append(data_list_i)
  68. data_list = []
  69. for item in zip(*data_list_tmp):
  70. data_list.append(item)
  71. else:
  72. # [audio sample point, fbank, text]
  73. data_list = data_in
  74. key_list = ["rand_key_" + ''.join(random.choice(chars) for _ in range(13)) for _ in range(len(data_in))]
  75. else: # raw text; audio sample point, fbank; bytes
  76. if isinstance(data_in, bytes): # audio bytes
  77. data_in = load_bytes(data_in)
  78. if key is None:
  79. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  80. data_list = [data_in]
  81. key_list = [key]
  82. return key_list, data_list
  83. class AutoModel:
  84. def __init__(self, **kwargs):
  85. if not kwargs.get("disable_log", False):
  86. tables.print()
  87. model, kwargs = self.build_model(**kwargs)
  88. # if vad_model is not None, build vad model else None
  89. vad_model = kwargs.get("vad_model", None)
  90. vad_kwargs = kwargs.get("vad_model_revision", None)
  91. if vad_model is not None:
  92. logging.info("Building VAD model.")
  93. vad_kwargs = {"model": vad_model, "model_revision": vad_kwargs, "device": kwargs["device"]}
  94. vad_model, vad_kwargs = self.build_model(**vad_kwargs)
  95. # if punc_model is not None, build punc model else None
  96. punc_model = kwargs.get("punc_model", None)
  97. punc_kwargs = kwargs.get("punc_model_revision", None)
  98. if punc_model is not None:
  99. logging.info("Building punc model.")
  100. punc_kwargs = {"model": punc_model, "model_revision": punc_kwargs, "device": kwargs["device"]}
  101. punc_model, punc_kwargs = self.build_model(**punc_kwargs)
  102. # if spk_model is not None, build spk model else None
  103. spk_model = kwargs.get("spk_model", None)
  104. spk_kwargs = kwargs.get("spk_model_revision", None)
  105. if spk_model is not None:
  106. logging.info("Building SPK model.")
  107. spk_kwargs = {"model": spk_model, "model_revision": spk_kwargs, "device": kwargs["device"]}
  108. spk_model, spk_kwargs = self.build_model(**spk_kwargs)
  109. self.cb_model = ClusterBackend().to(kwargs["device"])
  110. spk_mode = kwargs.get("spk_mode", 'punc_segment')
  111. if spk_mode not in ["default", "vad_segment", "punc_segment"]:
  112. logging.error("spk_mode should be one of default, vad_segment and punc_segment.")
  113. self.spk_mode = spk_mode
  114. self.kwargs = kwargs
  115. self.model = model
  116. self.vad_model = vad_model
  117. self.vad_kwargs = vad_kwargs
  118. self.punc_model = punc_model
  119. self.punc_kwargs = punc_kwargs
  120. self.spk_model = spk_model
  121. self.spk_kwargs = spk_kwargs
  122. self.model_path = kwargs.get("model_path")
  123. def build_model(self, **kwargs):
  124. assert "model" in kwargs
  125. if "model_conf" not in kwargs:
  126. logging.info("download models from model hub: {}".format(kwargs.get("model_hub", "ms")))
  127. kwargs = download_model(**kwargs)
  128. set_all_random_seed(kwargs.get("seed", 0))
  129. device = kwargs.get("device", "cuda")
  130. if not torch.cuda.is_available() or kwargs.get("ngpu", 1) == 0:
  131. device = "cpu"
  132. kwargs["batch_size"] = 1
  133. kwargs["device"] = device
  134. if kwargs.get("ncpu", None):
  135. torch.set_num_threads(kwargs.get("ncpu"))
  136. # build tokenizer
  137. tokenizer = kwargs.get("tokenizer", None)
  138. if tokenizer is not None:
  139. tokenizer_class = tables.tokenizer_classes.get(tokenizer)
  140. tokenizer = tokenizer_class(**kwargs["tokenizer_conf"])
  141. kwargs["tokenizer"] = tokenizer
  142. kwargs["token_list"] = tokenizer.token_list
  143. vocab_size = len(tokenizer.token_list)
  144. else:
  145. vocab_size = -1
  146. # build frontend
  147. frontend = kwargs.get("frontend", None)
  148. if frontend is not None:
  149. frontend_class = tables.frontend_classes.get(frontend)
  150. frontend = frontend_class(**kwargs["frontend_conf"])
  151. kwargs["frontend"] = frontend
  152. kwargs["input_size"] = frontend.output_size()
  153. # build model
  154. model_class = tables.model_classes.get(kwargs["model"])
  155. model = model_class(**kwargs, **kwargs["model_conf"], vocab_size=vocab_size)
  156. model.to(device)
  157. # init_param
  158. init_param = kwargs.get("init_param", None)
  159. if init_param is not None:
  160. logging.info(f"Loading pretrained params from {init_param}")
  161. load_pretrained_model(
  162. model=model,
  163. path=init_param,
  164. ignore_init_mismatch=kwargs.get("ignore_init_mismatch", False),
  165. oss_bucket=kwargs.get("oss_bucket", None),
  166. scope_map=kwargs.get("scope_map", None),
  167. excludes=kwargs.get("excludes", None),
  168. )
  169. return model, kwargs
  170. def __call__(self, *args, **cfg):
  171. kwargs = self.kwargs
  172. kwargs.update(cfg)
  173. res = self.model(*args, kwargs)
  174. return res
  175. def generate(self, input, input_len=None, **cfg):
  176. if self.vad_model is None:
  177. return self.inference(input, input_len=input_len, **cfg)
  178. else:
  179. return self.inference_with_vad(input, input_len=input_len, **cfg)
  180. def inference(self, input, input_len=None, model=None, kwargs=None, key=None, **cfg):
  181. kwargs = self.kwargs if kwargs is None else kwargs
  182. kwargs.update(cfg)
  183. model = self.model if model is None else model
  184. model.eval()
  185. batch_size = kwargs.get("batch_size", 1)
  186. # if kwargs.get("device", "cpu") == "cpu":
  187. # batch_size = 1
  188. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None), key=key)
  189. speed_stats = {}
  190. asr_result_list = []
  191. num_samples = len(data_list)
  192. disable_pbar = kwargs.get("disable_pbar", False)
  193. pbar = tqdm(colour="blue", total=num_samples, dynamic_ncols=True) if not disable_pbar else None
  194. time_speech_total = 0.0
  195. time_escape_total = 0.0
  196. for beg_idx in range(0, num_samples, batch_size):
  197. end_idx = min(num_samples, beg_idx + batch_size)
  198. data_batch = data_list[beg_idx:end_idx]
  199. key_batch = key_list[beg_idx:end_idx]
  200. batch = {"data_in": data_batch, "key": key_batch}
  201. if (end_idx - beg_idx) == 1 and kwargs.get("data_type", None) == "fbank": # fbank
  202. batch["data_in"] = data_batch[0]
  203. batch["data_lengths"] = input_len
  204. time1 = time.perf_counter()
  205. with torch.no_grad():
  206. results, meta_data = model.inference(**batch, **kwargs)
  207. time2 = time.perf_counter()
  208. asr_result_list.extend(results)
  209. # batch_data_time = time_per_frame_s * data_batch_i["speech_lengths"].sum().item()
  210. batch_data_time = meta_data.get("batch_data_time", -1)
  211. time_escape = time2 - time1
  212. speed_stats["load_data"] = meta_data.get("load_data", 0.0)
  213. speed_stats["extract_feat"] = meta_data.get("extract_feat", 0.0)
  214. speed_stats["forward"] = f"{time_escape:0.3f}"
  215. speed_stats["batch_size"] = f"{len(results)}"
  216. speed_stats["rtf"] = f"{(time_escape) / batch_data_time:0.3f}"
  217. description = (
  218. f"{speed_stats}, "
  219. )
  220. if pbar:
  221. pbar.update(1)
  222. pbar.set_description(description)
  223. time_speech_total += batch_data_time
  224. time_escape_total += time_escape
  225. if pbar:
  226. # pbar.update(1)
  227. pbar.set_description(f"rtf_avg: {time_escape_total/time_speech_total:0.3f}")
  228. torch.cuda.empty_cache()
  229. return asr_result_list
  230. def inference_with_vad(self, input, input_len=None, **cfg):
  231. # step.1: compute the vad model
  232. self.vad_kwargs.update(cfg)
  233. beg_vad = time.time()
  234. res = self.inference(input, input_len=input_len, model=self.vad_model, kwargs=self.vad_kwargs, **cfg)
  235. end_vad = time.time()
  236. print(f"time cost vad: {end_vad - beg_vad:0.3f}")
  237. # step.2 compute asr model
  238. model = self.model
  239. kwargs = self.kwargs
  240. kwargs.update(cfg)
  241. batch_size = int(kwargs.get("batch_size_s", 300))*1000
  242. batch_size_threshold_ms = int(kwargs.get("batch_size_threshold_s", 60))*1000
  243. kwargs["batch_size"] = batch_size
  244. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None))
  245. results_ret_list = []
  246. time_speech_total_all_samples = 1e-6
  247. beg_total = time.time()
  248. pbar_total = tqdm(colour="red", total=len(res), dynamic_ncols=True)
  249. for i in range(len(res)):
  250. key = res[i]["key"]
  251. vadsegments = res[i]["value"]
  252. input_i = data_list[i]
  253. speech = load_audio_text_image_video(input_i, fs=kwargs["frontend"].fs, audio_fs=kwargs.get("fs", 16000))
  254. speech_lengths = len(speech)
  255. n = len(vadsegments)
  256. data_with_index = [(vadsegments[i], i) for i in range(n)]
  257. sorted_data = sorted(data_with_index, key=lambda x: x[0][1] - x[0][0])
  258. results_sorted = []
  259. if not len(sorted_data):
  260. logging.info("decoding, utt: {}, empty speech".format(key))
  261. continue
  262. if len(sorted_data) > 0 and len(sorted_data[0]) > 0:
  263. batch_size = max(batch_size, sorted_data[0][0][1] - sorted_data[0][0][0])
  264. batch_size_ms_cum = 0
  265. beg_idx = 0
  266. beg_asr_total = time.time()
  267. time_speech_total_per_sample = speech_lengths/16000
  268. time_speech_total_all_samples += time_speech_total_per_sample
  269. # pbar_sample = tqdm(colour="blue", total=n, dynamic_ncols=True)
  270. all_segments = []
  271. for j, _ in enumerate(range(0, n)):
  272. # pbar_sample.update(1)
  273. batch_size_ms_cum += (sorted_data[j][0][1] - sorted_data[j][0][0])
  274. if j < n - 1 and (
  275. batch_size_ms_cum + sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size and (
  276. sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size_threshold_ms:
  277. continue
  278. batch_size_ms_cum = 0
  279. end_idx = j + 1
  280. speech_j, speech_lengths_j = slice_padding_audio_samples(speech, speech_lengths, sorted_data[beg_idx:end_idx])
  281. results = self.inference(speech_j, input_len=None, model=model, kwargs=kwargs, disable_pbar=True, **cfg)
  282. if self.spk_model is not None:
  283. # compose vad segments: [[start_time_sec, end_time_sec, speech], [...]]
  284. for _b in range(len(speech_j)):
  285. vad_segments = [[sorted_data[beg_idx:end_idx][_b][0][0]/1000.0,
  286. sorted_data[beg_idx:end_idx][_b][0][1]/1000.0,
  287. np.array(speech_j[_b])]]
  288. segments = sv_chunk(vad_segments)
  289. all_segments.extend(segments)
  290. speech_b = [i[2] for i in segments]
  291. spk_res = self.inference(speech_b, input_len=None, model=self.spk_model, kwargs=kwargs, disable_pbar=True, **cfg)
  292. results[_b]['spk_embedding'] = spk_res[0]['spk_embedding']
  293. beg_idx = end_idx
  294. if len(results) < 1:
  295. continue
  296. results_sorted.extend(results)
  297. # end_asr_total = time.time()
  298. # time_escape_total_per_sample = end_asr_total - beg_asr_total
  299. # pbar_sample.update(1)
  300. # pbar_sample.set_description(f"rtf_avg_per_sample: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  301. # f"time_speech_total_per_sample: {time_speech_total_per_sample: 0.3f}, "
  302. # f"time_escape_total_per_sample: {time_escape_total_per_sample:0.3f}")
  303. restored_data = [0] * n
  304. for j in range(n):
  305. index = sorted_data[j][1]
  306. restored_data[index] = results_sorted[j]
  307. result = {}
  308. # results combine for texts, timestamps, speaker embeddings and others
  309. # TODO: rewrite for clean code
  310. for j in range(n):
  311. for k, v in restored_data[j].items():
  312. if k.startswith("timestamp"):
  313. if k not in result:
  314. result[k] = []
  315. for t in restored_data[j][k]:
  316. t[0] += vadsegments[j][0]
  317. t[1] += vadsegments[j][0]
  318. result[k].extend(restored_data[j][k])
  319. elif k == 'spk_embedding':
  320. if k not in result:
  321. result[k] = restored_data[j][k]
  322. else:
  323. result[k] = torch.cat([result[k], restored_data[j][k]], dim=0)
  324. elif 'text' in k:
  325. if k not in result:
  326. result[k] = restored_data[j][k]
  327. else:
  328. result[k] += " " + restored_data[j][k]
  329. else:
  330. if k not in result:
  331. result[k] = restored_data[j][k]
  332. else:
  333. result[k] += restored_data[j][k]
  334. return_raw_text = kwargs.get('return_raw_text', False)
  335. # step.3 compute punc model
  336. if self.punc_model is not None:
  337. self.punc_kwargs.update(cfg)
  338. punc_res = self.inference(result["text"], model=self.punc_model, kwargs=self.punc_kwargs, disable_pbar=True, **cfg)
  339. raw_text = copy.copy(result["text"])
  340. if return_raw_text: result['raw_text'] = raw_text
  341. result["text"] = punc_res[0]["text"]
  342. else:
  343. raw_text = None
  344. # speaker embedding cluster after resorted
  345. if self.spk_model is not None and kwargs.get('return_spk_res', True):
  346. if raw_text is None:
  347. logging.error("Missing punc_model, which is required by spk_model.")
  348. all_segments = sorted(all_segments, key=lambda x: x[0])
  349. spk_embedding = result['spk_embedding']
  350. labels = self.cb_model(spk_embedding.cpu(), oracle_num=kwargs.get('preset_spk_num', None))
  351. # del result['spk_embedding']
  352. sv_output = postprocess(all_segments, None, labels, spk_embedding.cpu())
  353. if self.spk_mode == 'vad_segment': # recover sentence_list
  354. sentence_list = []
  355. for res, vadsegment in zip(restored_data, vadsegments):
  356. if 'timestamp' not in res:
  357. logging.error("Only 'iic/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch' \
  358. and 'iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'\
  359. can predict timestamp, and speaker diarization relies on timestamps.")
  360. sentence_list.append({"start": vadsegment[0],
  361. "end": vadsegment[1],
  362. "sentence": res['text'],
  363. "timestamp": res['timestamp']})
  364. elif self.spk_mode == 'punc_segment':
  365. if 'timestamp' not in result:
  366. logging.error("Only 'iic/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch' \
  367. and 'iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'\
  368. can predict timestamp, and speaker diarization relies on timestamps.")
  369. sentence_list = timestamp_sentence(punc_res[0]['punc_array'],
  370. result['timestamp'],
  371. raw_text,
  372. return_raw_text=return_raw_text)
  373. distribute_spk(sentence_list, sv_output)
  374. result['sentence_info'] = sentence_list
  375. elif kwargs.get("sentence_timestamp", False):
  376. sentence_list = timestamp_sentence(punc_res[0]['punc_array'],
  377. result['timestamp'],
  378. raw_text,
  379. return_raw_text=return_raw_text)
  380. result['sentence_info'] = sentence_list
  381. if "spk_embedding" in result: del result['spk_embedding']
  382. result["key"] = key
  383. results_ret_list.append(result)
  384. end_asr_total = time.time()
  385. time_escape_total_per_sample = end_asr_total - beg_asr_total
  386. pbar_total.update(1)
  387. pbar_total.set_description(f"rtf_avg: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  388. f"time_speech: {time_speech_total_per_sample: 0.3f}, "
  389. f"time_escape: {time_escape_total_per_sample:0.3f}")
  390. # end_total = time.time()
  391. # time_escape_total_all_samples = end_total - beg_total
  392. # print(f"rtf_avg_all: {time_escape_total_all_samples / time_speech_total_all_samples:0.3f}, "
  393. # f"time_speech_all: {time_speech_total_all_samples: 0.3f}, "
  394. # f"time_escape_all: {time_escape_total_all_samples:0.3f}")
  395. return results_ret_list