|
|
@@ -10,29 +10,13 @@ import time
|
|
|
import logging
|
|
|
from torch.nn.utils.rnn import pad_sequence
|
|
|
try:
|
|
|
- from urllib.parse import urlparse
|
|
|
- from funasr.download.file import HTTPStorage
|
|
|
- import tempfile
|
|
|
+ from funasr.download.file import download_from_url
|
|
|
except:
|
|
|
print("urllib is not installed, if you infer from url, please install it first.")
|
|
|
-# def load_audio(data_or_path_or_list, fs: int=16000, audio_fs: int=16000):
|
|
|
-#
|
|
|
-# if isinstance(data_or_path_or_list, (list, tuple)):
|
|
|
-# return [load_audio(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
|
|
|
-#
|
|
|
-# if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
|
|
|
-# data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
|
|
|
-# data_or_path_or_list = data_or_path_or_list[0, :]
|
|
|
-# elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point
|
|
|
-# data_or_path_or_list = np.squeeze(data_or_path_or_list) #[n_samples,]
|
|
|
-#
|
|
|
-# if audio_fs != fs:
|
|
|
-# resampler = torchaudio.transforms.Resample(audio_fs, fs)
|
|
|
-# data_or_path_or_list = resampler(data_or_path_or_list[None, :])[0, :]
|
|
|
-# return data_or_path_or_list
|
|
|
|
|
|
|
|
|
-def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type=None, tokenizer=None):
|
|
|
+
|
|
|
+def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs: int = 16000, data_type="sound", tokenizer=None):
|
|
|
if isinstance(data_or_path_or_list, (list, tuple)):
|
|
|
if data_type is not None and isinstance(data_type, (list, tuple)):
|
|
|
|
|
|
@@ -47,16 +31,22 @@ def load_audio_text_image_video(data_or_path_or_list, fs: int = 16000, audio_fs:
|
|
|
|
|
|
return data_or_path_or_list_ret
|
|
|
else:
|
|
|
- return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs) for audio in data_or_path_or_list]
|
|
|
+ return [load_audio_text_image_video(audio, fs=fs, audio_fs=audio_fs, data_type=data_type) for audio in data_or_path_or_list]
|
|
|
if isinstance(data_or_path_or_list, str) and data_or_path_or_list.startswith('http'):
|
|
|
data_or_path_or_list = download_from_url(data_or_path_or_list)
|
|
|
if isinstance(data_or_path_or_list, str) and os.path.exists(data_or_path_or_list):
|
|
|
- data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
|
|
|
- data_or_path_or_list = data_or_path_or_list[0, :]
|
|
|
+ if data_type is None or data_type == "sound":
|
|
|
+ data_or_path_or_list, audio_fs = torchaudio.load(data_or_path_or_list)
|
|
|
+ data_or_path_or_list = data_or_path_or_list[0, :]
|
|
|
+ # elif data_type == "text" and tokenizer is not None:
|
|
|
+ # data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
|
|
|
+ elif isinstance(data_or_path_or_list, str) and data_type == "text" and tokenizer is not None:
|
|
|
+ data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
|
|
|
elif isinstance(data_or_path_or_list, np.ndarray): # audio sample point
|
|
|
data_or_path_or_list = np.squeeze(data_or_path_or_list) # [n_samples,]
|
|
|
- elif isinstance(data_or_path_or_list, str) and data_type is not None and data_type == "text" and tokenizer is not None:
|
|
|
- data_or_path_or_list = tokenizer.encode(data_or_path_or_list)
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ # print(f"unsupport data type: {data_or_path_or_list}, return raw data")
|
|
|
|
|
|
if audio_fs != fs and data_type != "text":
|
|
|
resampler = torchaudio.transforms.Resample(audio_fs, fs)
|
|
|
@@ -107,19 +97,3 @@ def extract_fbank(data, data_len = None, data_type: str="sound", frontend=None):
|
|
|
data_len = torch.tensor([data_len])
|
|
|
return data.to(torch.float32), data_len.to(torch.int32)
|
|
|
|
|
|
-def download_from_url(url):
|
|
|
-
|
|
|
- result = urlparse(url)
|
|
|
- file_path = None
|
|
|
- if result.scheme is not None and len(result.scheme) > 0:
|
|
|
- storage = HTTPStorage()
|
|
|
- # bytes
|
|
|
- data = storage.read(url)
|
|
|
- work_dir = tempfile.TemporaryDirectory().name
|
|
|
- if not os.path.exists(work_dir):
|
|
|
- os.makedirs(work_dir)
|
|
|
- file_path = os.path.join(work_dir, os.path.basename(url))
|
|
|
- with open(file_path, 'wb') as fb:
|
|
|
- fb.write(data)
|
|
|
- assert file_path is not None, f"failed to download: {url}"
|
|
|
- return file_path
|