auto_model.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. #!/usr/bin/env python3
  2. # -*- encoding: utf-8 -*-
  3. # Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights Reserved.
  4. # MIT License (https://opensource.org/licenses/MIT)
  5. import json
  6. import time
  7. import copy
  8. import torch
  9. import random
  10. import string
  11. import logging
  12. import os.path
  13. import numpy as np
  14. from tqdm import tqdm
  15. from funasr.register import tables
  16. from funasr.utils.load_utils import load_bytes
  17. from funasr.download.file import download_from_url
  18. from funasr.utils.timestamp_tools import timestamp_sentence
  19. from funasr.download.download_from_hub import download_model
  20. from funasr.utils.vad_utils import slice_padding_audio_samples
  21. from funasr.utils.load_utils import load_audio_text_image_video
  22. from funasr.train_utils.set_all_random_seed import set_all_random_seed
  23. from funasr.train_utils.load_pretrained_model import load_pretrained_model
  24. from funasr.models.campplus.utils import sv_chunk, postprocess, distribute_spk
  25. try:
  26. from funasr.models.campplus.cluster_backend import ClusterBackend
  27. except:
  28. print("If you want to use the speaker diarization, please `pip install hdbscan`")
  29. import pdb
  30. def prepare_data_iterator(data_in, input_len=None, data_type=None, key=None):
  31. """
  32. :param input:
  33. :param input_len:
  34. :param data_type:
  35. :param frontend:
  36. :return:
  37. """
  38. data_list = []
  39. key_list = []
  40. filelist = [".scp", ".txt", ".json", ".jsonl"]
  41. chars = string.ascii_letters + string.digits
  42. if isinstance(data_in, str) and data_in.startswith('http'): # url
  43. data_in = download_from_url(data_in)
  44. if isinstance(data_in, str) and os.path.exists(data_in): # wav_path; filelist: wav.scp, file.jsonl;text.txt;
  45. _, file_extension = os.path.splitext(data_in)
  46. file_extension = file_extension.lower()
  47. if file_extension in filelist: #filelist: wav.scp, file.jsonl;text.txt;
  48. with open(data_in, encoding='utf-8') as fin:
  49. for line in fin:
  50. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  51. if data_in.endswith(".jsonl"): #file.jsonl: json.dumps({"source": data})
  52. lines = json.loads(line.strip())
  53. data = lines["source"]
  54. key = data["key"] if "key" in data else key
  55. else: # filelist, wav.scp, text.txt: id \t data or data
  56. lines = line.strip().split(maxsplit=1)
  57. data = lines[1] if len(lines)>1 else lines[0]
  58. key = lines[0] if len(lines)>1 else key
  59. data_list.append(data)
  60. key_list.append(key)
  61. else:
  62. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  63. data_list = [data_in]
  64. key_list = [key]
  65. elif isinstance(data_in, (list, tuple)):
  66. if data_type is not None and isinstance(data_type, (list, tuple)): # mutiple inputs
  67. data_list_tmp = []
  68. for data_in_i, data_type_i in zip(data_in, data_type):
  69. key_list, data_list_i = prepare_data_iterator(data_in=data_in_i, data_type=data_type_i)
  70. data_list_tmp.append(data_list_i)
  71. data_list = []
  72. for item in zip(*data_list_tmp):
  73. data_list.append(item)
  74. else:
  75. # [audio sample point, fbank, text]
  76. data_list = data_in
  77. key_list = ["rand_key_" + ''.join(random.choice(chars) for _ in range(13)) for _ in range(len(data_in))]
  78. else: # raw text; audio sample point, fbank; bytes
  79. if isinstance(data_in, bytes): # audio bytes
  80. data_in = load_bytes(data_in)
  81. if key is None:
  82. key = "rand_key_" + ''.join(random.choice(chars) for _ in range(13))
  83. data_list = [data_in]
  84. key_list = [key]
  85. return key_list, data_list
  86. class AutoModel:
  87. def __init__(self, **kwargs):
  88. if not kwargs.get("disable_log", True):
  89. tables.print()
  90. model, kwargs = self.build_model(**kwargs)
  91. # if vad_model is not None, build vad model else None
  92. vad_model = kwargs.get("vad_model", None)
  93. vad_kwargs = kwargs.get("vad_model_revision", None)
  94. if vad_model is not None:
  95. logging.info("Building VAD model.")
  96. vad_kwargs = {"model": vad_model, "model_revision": vad_kwargs, "device": kwargs["device"]}
  97. vad_model, vad_kwargs = self.build_model(**vad_kwargs)
  98. # if punc_model is not None, build punc model else None
  99. punc_model = kwargs.get("punc_model", None)
  100. punc_kwargs = kwargs.get("punc_model_revision", None)
  101. if punc_model is not None:
  102. logging.info("Building punc model.")
  103. punc_kwargs = {"model": punc_model, "model_revision": punc_kwargs, "device": kwargs["device"]}
  104. punc_model, punc_kwargs = self.build_model(**punc_kwargs)
  105. # if spk_model is not None, build spk model else None
  106. spk_model = kwargs.get("spk_model", None)
  107. spk_kwargs = kwargs.get("spk_model_revision", None)
  108. if spk_model is not None:
  109. logging.info("Building SPK model.")
  110. spk_kwargs = {"model": spk_model, "model_revision": spk_kwargs, "device": kwargs["device"]}
  111. spk_model, spk_kwargs = self.build_model(**spk_kwargs)
  112. self.cb_model = ClusterBackend().to(kwargs["device"])
  113. spk_mode = kwargs.get("spk_mode", 'punc_segment')
  114. if spk_mode not in ["default", "vad_segment", "punc_segment"]:
  115. logging.error("spk_mode should be one of default, vad_segment and punc_segment.")
  116. self.spk_mode = spk_mode
  117. self.kwargs = kwargs
  118. self.model = model
  119. self.vad_model = vad_model
  120. self.vad_kwargs = vad_kwargs
  121. self.punc_model = punc_model
  122. self.punc_kwargs = punc_kwargs
  123. self.spk_model = spk_model
  124. self.spk_kwargs = spk_kwargs
  125. self.model_path = kwargs.get("model_path")
  126. def build_model(self, **kwargs):
  127. assert "model" in kwargs
  128. if "model_conf" not in kwargs:
  129. logging.info("download models from model hub: {}".format(kwargs.get("hub", "ms")))
  130. kwargs = download_model(**kwargs)
  131. set_all_random_seed(kwargs.get("seed", 0))
  132. device = kwargs.get("device", "cuda")
  133. if not torch.cuda.is_available() or kwargs.get("ngpu", 1) == 0:
  134. device = "cpu"
  135. kwargs["batch_size"] = 1
  136. kwargs["device"] = device
  137. if kwargs.get("ncpu", None):
  138. torch.set_num_threads(kwargs.get("ncpu"))
  139. # build tokenizer
  140. tokenizer = kwargs.get("tokenizer", None)
  141. if tokenizer is not None:
  142. tokenizer_class = tables.tokenizer_classes.get(tokenizer)
  143. tokenizer = tokenizer_class(**kwargs["tokenizer_conf"])
  144. kwargs["tokenizer"] = tokenizer
  145. kwargs["token_list"] = tokenizer.token_list if hasattr(tokenizer, "token_list") else None
  146. kwargs["token_list"] = tokenizer.get_vocab() if hasattr(tokenizer, "get_vocab") else kwargs["token_list"]
  147. vocab_size = len(kwargs["token_list"]) if kwargs["token_list"] is not None else -1
  148. else:
  149. vocab_size = -1
  150. # build frontend
  151. frontend = kwargs.get("frontend", None)
  152. kwargs["input_size"] = None
  153. if frontend is not None:
  154. frontend_class = tables.frontend_classes.get(frontend)
  155. frontend = frontend_class(**kwargs["frontend_conf"])
  156. kwargs["frontend"] = frontend
  157. kwargs["input_size"] = frontend.output_size() if hasattr(frontend, "output_size") else None
  158. # build model
  159. model_class = tables.model_classes.get(kwargs["model"])
  160. model = model_class(**kwargs, **kwargs.get("model_conf", {}), vocab_size=vocab_size)
  161. model.to(device)
  162. # init_param
  163. init_param = kwargs.get("init_param", None)
  164. if init_param is not None:
  165. if os.path.exists(init_param):
  166. logging.info(f"Loading pretrained params from {init_param}")
  167. load_pretrained_model(
  168. model=model,
  169. path=init_param,
  170. ignore_init_mismatch=kwargs.get("ignore_init_mismatch", False),
  171. oss_bucket=kwargs.get("oss_bucket", None),
  172. scope_map=kwargs.get("scope_map", []),
  173. excludes=kwargs.get("excludes", None),
  174. )
  175. else:
  176. print(f"error, init_param does not exist!: {init_param}")
  177. return model, kwargs
  178. def __call__(self, *args, **cfg):
  179. kwargs = self.kwargs
  180. kwargs.update(cfg)
  181. res = self.model(*args, kwargs)
  182. return res
  183. def generate(self, input, input_len=None, **cfg):
  184. if self.vad_model is None:
  185. return self.inference(input, input_len=input_len, **cfg)
  186. else:
  187. return self.inference_with_vad(input, input_len=input_len, **cfg)
  188. def inference(self, input, input_len=None, model=None, kwargs=None, key=None, **cfg):
  189. kwargs = self.kwargs if kwargs is None else kwargs
  190. kwargs.update(cfg)
  191. model = self.model if model is None else model
  192. model.eval()
  193. batch_size = kwargs.get("batch_size", 1)
  194. # if kwargs.get("device", "cpu") == "cpu":
  195. # batch_size = 1
  196. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None), key=key)
  197. speed_stats = {}
  198. asr_result_list = []
  199. num_samples = len(data_list)
  200. disable_pbar = self.kwargs.get("disable_pbar", False)
  201. pbar = tqdm(colour="blue", total=num_samples, dynamic_ncols=True) if not disable_pbar else None
  202. time_speech_total = 0.0
  203. time_escape_total = 0.0
  204. for beg_idx in range(0, num_samples, batch_size):
  205. end_idx = min(num_samples, beg_idx + batch_size)
  206. data_batch = data_list[beg_idx:end_idx]
  207. key_batch = key_list[beg_idx:end_idx]
  208. batch = {"data_in": data_batch, "key": key_batch}
  209. if (end_idx - beg_idx) == 1 and kwargs.get("data_type", None) == "fbank": # fbank
  210. batch["data_in"] = data_batch[0]
  211. batch["data_lengths"] = input_len
  212. time1 = time.perf_counter()
  213. with torch.no_grad():
  214. results, meta_data = model.inference(**batch, **kwargs)
  215. time2 = time.perf_counter()
  216. asr_result_list.extend(results)
  217. # batch_data_time = time_per_frame_s * data_batch_i["speech_lengths"].sum().item()
  218. batch_data_time = meta_data.get("batch_data_time", -1)
  219. time_escape = time2 - time1
  220. speed_stats["load_data"] = meta_data.get("load_data", 0.0)
  221. speed_stats["extract_feat"] = meta_data.get("extract_feat", 0.0)
  222. speed_stats["forward"] = f"{time_escape:0.3f}"
  223. speed_stats["batch_size"] = f"{len(results)}"
  224. speed_stats["rtf"] = f"{(time_escape) / batch_data_time:0.3f}"
  225. description = (
  226. f"{speed_stats}, "
  227. )
  228. if pbar:
  229. pbar.update(1)
  230. pbar.set_description(description)
  231. time_speech_total += batch_data_time
  232. time_escape_total += time_escape
  233. if pbar:
  234. # pbar.update(1)
  235. pbar.set_description(f"rtf_avg: {time_escape_total/time_speech_total:0.3f}")
  236. torch.cuda.empty_cache()
  237. return asr_result_list
  238. def inference_with_vad(self, input, input_len=None, **cfg):
  239. kwargs = self.kwargs
  240. # step.1: compute the vad model
  241. self.vad_kwargs.update(cfg)
  242. beg_vad = time.time()
  243. res = self.inference(input, input_len=input_len, model=self.vad_model, kwargs=self.vad_kwargs, **cfg)
  244. end_vad = time.time()
  245. # step.2 compute asr model
  246. model = self.model
  247. kwargs.update(cfg)
  248. batch_size = int(kwargs.get("batch_size_s", 300))*1000
  249. batch_size_threshold_ms = int(kwargs.get("batch_size_threshold_s", 60))*1000
  250. kwargs["batch_size"] = batch_size
  251. key_list, data_list = prepare_data_iterator(input, input_len=input_len, data_type=kwargs.get("data_type", None))
  252. results_ret_list = []
  253. time_speech_total_all_samples = 1e-6
  254. beg_total = time.time()
  255. pbar_total = tqdm(colour="red", total=len(res), dynamic_ncols=True) if not kwargs.get("disable_pbar", False) else None
  256. for i in range(len(res)):
  257. key = res[i]["key"]
  258. vadsegments = res[i]["value"]
  259. input_i = data_list[i]
  260. speech = load_audio_text_image_video(input_i, fs=kwargs["frontend"].fs, audio_fs=kwargs.get("fs", 16000))
  261. speech_lengths = len(speech)
  262. n = len(vadsegments)
  263. data_with_index = [(vadsegments[i], i) for i in range(n)]
  264. sorted_data = sorted(data_with_index, key=lambda x: x[0][1] - x[0][0])
  265. results_sorted = []
  266. if not len(sorted_data):
  267. logging.info("decoding, utt: {}, empty speech".format(key))
  268. continue
  269. if len(sorted_data) > 0 and len(sorted_data[0]) > 0:
  270. batch_size = max(batch_size, sorted_data[0][0][1] - sorted_data[0][0][0])
  271. batch_size_ms_cum = 0
  272. beg_idx = 0
  273. beg_asr_total = time.time()
  274. time_speech_total_per_sample = speech_lengths/16000
  275. time_speech_total_all_samples += time_speech_total_per_sample
  276. # pbar_sample = tqdm(colour="blue", total=n, dynamic_ncols=True)
  277. all_segments = []
  278. for j, _ in enumerate(range(0, n)):
  279. # pbar_sample.update(1)
  280. batch_size_ms_cum += (sorted_data[j][0][1] - sorted_data[j][0][0])
  281. if j < n - 1 and (
  282. batch_size_ms_cum + sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size and (
  283. sorted_data[j + 1][0][1] - sorted_data[j + 1][0][0]) < batch_size_threshold_ms:
  284. continue
  285. batch_size_ms_cum = 0
  286. end_idx = j + 1
  287. speech_j, speech_lengths_j = slice_padding_audio_samples(speech, speech_lengths, sorted_data[beg_idx:end_idx])
  288. results = self.inference(speech_j, input_len=None, model=model, kwargs=kwargs, **cfg)
  289. if self.spk_model is not None:
  290. # compose vad segments: [[start_time_sec, end_time_sec, speech], [...]]
  291. for _b in range(len(speech_j)):
  292. vad_segments = [[sorted_data[beg_idx:end_idx][_b][0][0]/1000.0,
  293. sorted_data[beg_idx:end_idx][_b][0][1]/1000.0,
  294. np.array(speech_j[_b])]]
  295. segments = sv_chunk(vad_segments)
  296. all_segments.extend(segments)
  297. speech_b = [i[2] for i in segments]
  298. spk_res = self.inference(speech_b, input_len=None, model=self.spk_model, kwargs=kwargs, **cfg)
  299. results[_b]['spk_embedding'] = spk_res[0]['spk_embedding']
  300. beg_idx = end_idx
  301. if len(results) < 1:
  302. continue
  303. results_sorted.extend(results)
  304. # end_asr_total = time.time()
  305. # time_escape_total_per_sample = end_asr_total - beg_asr_total
  306. # pbar_sample.update(1)
  307. # pbar_sample.set_description(f"rtf_avg_per_sample: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  308. # f"time_speech_total_per_sample: {time_speech_total_per_sample: 0.3f}, "
  309. # f"time_escape_total_per_sample: {time_escape_total_per_sample:0.3f}")
  310. restored_data = [0] * n
  311. for j in range(n):
  312. index = sorted_data[j][1]
  313. restored_data[index] = results_sorted[j]
  314. result = {}
  315. # results combine for texts, timestamps, speaker embeddings and others
  316. # TODO: rewrite for clean code
  317. for j in range(n):
  318. for k, v in restored_data[j].items():
  319. if k.startswith("timestamp"):
  320. if k not in result:
  321. result[k] = []
  322. for t in restored_data[j][k]:
  323. t[0] += vadsegments[j][0]
  324. t[1] += vadsegments[j][0]
  325. result[k].extend(restored_data[j][k])
  326. elif k == 'spk_embedding':
  327. if k not in result:
  328. result[k] = restored_data[j][k]
  329. else:
  330. result[k] = torch.cat([result[k], restored_data[j][k]], dim=0)
  331. elif 'text' in k:
  332. if k not in result:
  333. result[k] = restored_data[j][k]
  334. else:
  335. result[k] += " " + restored_data[j][k]
  336. else:
  337. if k not in result:
  338. result[k] = restored_data[j][k]
  339. else:
  340. result[k] += restored_data[j][k]
  341. return_raw_text = kwargs.get('return_raw_text', False)
  342. # step.3 compute punc model
  343. if self.punc_model is not None:
  344. if not len(result["text"]):
  345. if return_raw_text:
  346. result['raw_text'] = ''
  347. else:
  348. self.punc_kwargs.update(cfg)
  349. punc_res = self.inference(result["text"], model=self.punc_model, kwargs=self.punc_kwargs, **cfg)
  350. raw_text = copy.copy(result["text"])
  351. if return_raw_text: result['raw_text'] = raw_text
  352. result["text"] = punc_res[0]["text"]
  353. else:
  354. raw_text = None
  355. # speaker embedding cluster after resorted
  356. if self.spk_model is not None and kwargs.get('return_spk_res', True):
  357. if raw_text is None:
  358. logging.error("Missing punc_model, which is required by spk_model.")
  359. all_segments = sorted(all_segments, key=lambda x: x[0])
  360. spk_embedding = result['spk_embedding']
  361. labels = self.cb_model(spk_embedding.cpu(), oracle_num=kwargs.get('preset_spk_num', None))
  362. # del result['spk_embedding']
  363. sv_output = postprocess(all_segments, None, labels, spk_embedding.cpu())
  364. if self.spk_mode == 'vad_segment': # recover sentence_list
  365. sentence_list = []
  366. for res, vadsegment in zip(restored_data, vadsegments):
  367. if 'timestamp' not in res:
  368. logging.error("Only 'iic/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch' \
  369. and 'iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'\
  370. can predict timestamp, and speaker diarization relies on timestamps.")
  371. sentence_list.append({"start": vadsegment[0],
  372. "end": vadsegment[1],
  373. "sentence": res['text'],
  374. "timestamp": res['timestamp']})
  375. elif self.spk_mode == 'punc_segment':
  376. if 'timestamp' not in result:
  377. logging.error("Only 'iic/speech_paraformer-large-vad-punc_asr_nat-zh-cn-16k-common-vocab8404-pytorch' \
  378. and 'iic/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch'\
  379. can predict timestamp, and speaker diarization relies on timestamps.")
  380. sentence_list = timestamp_sentence(punc_res[0]['punc_array'],
  381. result['timestamp'],
  382. raw_text,
  383. return_raw_text=return_raw_text)
  384. distribute_spk(sentence_list, sv_output)
  385. result['sentence_info'] = sentence_list
  386. elif kwargs.get("sentence_timestamp", False):
  387. if not len(result['text']):
  388. sentence_list = []
  389. else:
  390. sentence_list = timestamp_sentence(punc_res[0]['punc_array'],
  391. result['timestamp'],
  392. raw_text,
  393. return_raw_text=return_raw_text)
  394. result['sentence_info'] = sentence_list
  395. if "spk_embedding" in result: del result['spk_embedding']
  396. result["key"] = key
  397. results_ret_list.append(result)
  398. end_asr_total = time.time()
  399. time_escape_total_per_sample = end_asr_total - beg_asr_total
  400. if pbar_total:
  401. pbar_total.update(1)
  402. pbar_total.set_description(f"rtf_avg: {time_escape_total_per_sample / time_speech_total_per_sample:0.3f}, "
  403. f"time_speech: {time_speech_total_per_sample: 0.3f}, "
  404. f"time_escape: {time_escape_total_per_sample:0.3f}")
  405. # end_total = time.time()
  406. # time_escape_total_all_samples = end_total - beg_total
  407. # print(f"rtf_avg_all: {time_escape_total_all_samples / time_speech_total_all_samples:0.3f}, "
  408. # f"time_speech_all: {time_speech_total_all_samples: 0.3f}, "
  409. # f"time_escape_all: {time_escape_total_all_samples:0.3f}")
  410. return results_ret_list