| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- import torch
- import copy
- from funasr.register import tables
- from funasr.utils.load_utils import extract_fbank, load_audio_text_image_video
- @tables.register("dataset_classes", "AudioLLMDataset")
- class AudioLLMDataset(torch.utils.data.Dataset):
- """
- AudioLLMDataset
- """
- def __init__(self,
- path,
- index_ds: str = None,
- frontend=None,
- tokenizer=None,
- int_pad_value: int = -1,
- float_pad_value: float = 0.0,
- **kwargs):
- super().__init__()
- index_ds_class = tables.index_ds_classes.get(index_ds)
- self.index_ds = index_ds_class(path, **kwargs)
- preprocessor_speech = kwargs.get("preprocessor_speech", None)
- if preprocessor_speech:
- preprocessor_speech_class = tables.preprocessor_classes.get(preprocessor_speech)
- preprocessor_speech = preprocessor_speech_class(**kwargs.get("preprocessor_speech_conf", {}))
- self.preprocessor_speech = preprocessor_speech
- preprocessor_text = kwargs.get("preprocessor_text", None)
- if preprocessor_text:
- preprocessor_text_class = tables.preprocessor_classes.get(preprocessor_text)
- preprocessor_text = preprocessor_text_class(**kwargs.get("preprocessor_text_conf", {}))
- self.preprocessor_text = preprocessor_text
-
- self.frontend = frontend
- self.fs = 16000 if frontend is None else frontend.fs
- self.data_type = "sound"
- self.tokenizer = tokenizer
- self.float_pad_value = float_pad_value
- self.prompt = kwargs.get("prompt", "Transcribe speech to text.")
- self.prompt_pre = "USER: \nINSTRUCTION: {}\nINPUT: ".format(
- self.prompt) # "USER: \nINSTRUCTION: {}\nnINPUT: {}\nASSISTANT: "
- self.prompt_af = ""
- self.IGNORE_INDEX = kwargs.get("IGNORE_INDEX", -100)
- self.int_pad_value = self.IGNORE_INDEX
-
- def get_source_len(self, index):
- item = self.index_ds[index]
- return self.index_ds.get_source_len(item)
-
- def get_target_len(self, index):
- item = self.index_ds[index]
- return self.index_ds.get_target_len(item)
-
- def __len__(self):
- return len(self.index_ds)
-
- def __getitem__(self, index):
- item = self.index_ds[index]
- # import pdb;
- # pdb.set_trace()
- source = item["source"]
- data_src = load_audio_text_image_video(source, fs=self.fs)
- if self.preprocessor_speech:
- data_src = self.preprocessor_speech(data_src, fs=self.fs)
- speech, speech_lengths = extract_fbank(data_src, data_type=self.data_type, frontend=self.frontend, is_final=True) # speech: [b, T, d]
- speech = speech.squeeze(0)
- target = item["target"]
- if self.preprocessor_text:
- target = self.preprocessor_text(target)
-
-
- prompt_ids_pre = self.tokenizer.encode(self.prompt_pre) # [bos,prompt]
- prompt_pre_length = len(prompt_ids_pre)
-
- prompt_input = "{}{}".format(self.prompt_pre, target)
- prompt_input_ids = self.tokenizer.encode(prompt_input)
- audio_length = len(prompt_input_ids) - prompt_pre_length
- input_ids = prompt_input_ids + [self.tokenizer.pad_token_id]
- input_ids = torch.tensor(input_ids, dtype=torch.int64) #[bos, prompt, input, pad]
- input_ids[prompt_pre_length:] = -1 # [bos, prompt,-1,-1]
- attention_mask = input_ids.ge(-1) # [true, true, true, true], length mask
- prompt_answer = "{}{}".format(self.prompt_pre, target)
- prompt_answer_ids = self.tokenizer.encode(prompt_answer)
- answer_length = len(prompt_answer_ids) - prompt_pre_length
- labels_ids = copy.deepcopy(prompt_input_ids) + [self.tokenizer.eos_token_id]
- labels_ids = torch.tensor(labels_ids, dtype=torch.int64) # [bos, prompt, input, eos]
- labels_ids[:prompt_pre_length] = -1 # [-1, -1, input, eos]
- label_mask = labels_ids.ge(0) # [False,False,True,True]
- labels_ids[~label_mask] = self.IGNORE_INDEX # [-100,-100,input,eos]
-
- audio_mask = [0] * prompt_pre_length + [1] * audio_length + [0]
- audio_mask = torch.tensor(audio_mask, dtype=torch.float32)
-
- ids = self.tokenizer.encode(target) # token ids is different from labels_ids
- text = torch.tensor(ids, dtype=torch.int64)
- text_lengths = torch.tensor([len(ids)], dtype=torch.int32)
-
- return {"speech": speech,
- "speech_lengths": speech_lengths,
- "text": text,
- "text_lengths": text_lengths,
- "input_ids": input_ids,
- "attention_mask": attention_mask,
- "labels_ids": labels_ids,
- "label_mask": label_mask,
- "audio_mask": audio_mask,
- }
-
-
- def collator(self, samples: list=None):
- outputs = {}
- for sample in samples:
- for key in sample.keys():
- if key not in outputs:
- outputs[key] = []
- outputs[key].append(sample[key])
- for key, data_list in outputs.items():
- if isinstance(data_list[0], torch.Tensor):
- if data_list[0].dtype == torch.int64:
-
- pad_value = self.int_pad_value
- else:
- pad_value = self.float_pad_value
-
- outputs[key] = torch.nn.utils.rnn.pad_sequence(data_list, batch_first=True, padding_value=pad_value)
- return outputs
- @tables.register("dataset_classes", "AudioLLMARDataset")
- class AudioLLMARDataset(torch.utils.data.Dataset):
- """
- AudioLLMDataset
- """
-
- def __init__(self,
- path,
- index_ds: str = None,
- frontend=None,
- tokenizer=None,
- int_pad_value: int = -1,
- float_pad_value: float = 0.0,
- **kwargs):
- super().__init__()
- index_ds_class = tables.index_ds_classes.get(index_ds)
- self.index_ds = index_ds_class(path, **kwargs)
- preprocessor_speech = kwargs.get("preprocessor_speech", None)
- if preprocessor_speech:
- preprocessor_speech_class = tables.preprocessor_classes.get(preprocessor_speech)
- preprocessor_speech = preprocessor_speech_class(**kwargs.get("preprocessor_speech_conf", {}))
- self.preprocessor_speech = preprocessor_speech
- preprocessor_text = kwargs.get("preprocessor_text", None)
- if preprocessor_text:
- preprocessor_text_class = tables.preprocessor_classes.get(preprocessor_text)
- preprocessor_text = preprocessor_text_class(**kwargs.get("preprocessor_text_conf", {}))
- self.preprocessor_text = preprocessor_text
-
- self.frontend = frontend
- self.fs = 16000 if frontend is None else frontend.fs
- self.data_type = "sound"
- self.tokenizer = tokenizer
-
- self.float_pad_value = float_pad_value
- self.prompt = kwargs.get("prompt", "Transcribe speech to text.")
- self.prompt_pre = "USER: \nINSTRUCTION: {}\nINPUT: ".format(
- self.prompt) # "USER: \nINSTRUCTION: {}\nnINPUT: {}\nASSISTANT: "
- self.prompt_af = ""
- self.IGNORE_INDEX = kwargs.get("IGNORE_INDEX", -100)
- self.int_pad_value = self.IGNORE_INDEX
-
- def get_source_len(self, index):
- item = self.index_ds[index]
- return self.index_ds.get_source_len(item)
-
- def get_target_len(self, index):
- item = self.index_ds[index]
- return self.index_ds.get_target_len(item)
-
- def __len__(self):
- return len(self.index_ds)
-
- def __getitem__(self, index):
- item = self.index_ds[index]
- # import pdb;
- # pdb.set_trace()
- source = item["source"]
- data_src = load_audio_text_image_video(source, fs=self.fs)
- if self.preprocessor_speech:
- data_src = self.preprocessor_speech(data_src, fs=self.fs)
- speech, speech_lengths = extract_fbank(data_src, data_type=self.data_type, frontend=self.frontend,
- is_final=True) # speech: [b, T, d]
- speech = speech.squeeze(0)
-
- target = item["target"]
- if self.preprocessor_text:
- target = self.preprocessor_text(target)
-
- prompt_ids_pre = self.tokenizer.encode(self.prompt_pre) # [bos,prompt]
- prompt_pre_length = len(prompt_ids_pre)
-
- prompt_input = "{}{}".format(self.prompt_pre, target)
- prompt_input_ids = self.tokenizer.encode(prompt_input)
- audio_length = len(prompt_input_ids) - prompt_pre_length
- input_ids = prompt_input_ids + [self.tokenizer.pad_token_id]
- input_ids = torch.tensor(input_ids, dtype=torch.int64) # [bos, prompt, input, pad]
- input_ids[prompt_pre_length:] = -1 # [bos, prompt,-1,-1]
- attention_mask = input_ids.ge(-1) # [true, true, true, true], length mask
-
- prompt_answer = "{}{}".format(self.prompt_pre, target)
- prompt_answer_ids = self.tokenizer.encode(prompt_answer)
- answer_length = len(prompt_answer_ids) - prompt_pre_length
- labels_ids = copy.deepcopy(prompt_input_ids) + [self.tokenizer.eos_token_id]
- labels_ids = torch.tensor(labels_ids, dtype=torch.int64) # [bos, prompt, input, eos]
- labels_ids[:prompt_pre_length] = -1 # [-1, -1, input, eos]
- label_mask = labels_ids.ge(0) # [False,False,True,True]
- labels_ids[~label_mask] = self.IGNORE_INDEX # [-100,-100,input,eos]
-
- audio_mask = [0] * prompt_pre_length + [1] * audio_length + [0]
- audio_mask = torch.tensor(audio_mask, dtype=torch.float32)
-
- ids = self.tokenizer.encode(target) # token ids is different from labels_ids
- text = torch.tensor(ids, dtype=torch.int64)
- text_lengths = torch.tensor([len(ids)], dtype=torch.int32)
-
- return {"speech": speech,
- "speech_lengths": speech_lengths,
- "text": text,
- "text_lengths": text_lengths,
- "input_ids": input_ids,
- "attention_mask": attention_mask,
- "labels_ids": labels_ids,
- "label_mask": label_mask,
- "audio_mask": audio_mask,
- }
-
- def collator(self, samples: list = None):
- outputs = {}
- for sample in samples:
- for key in sample.keys():
- if key not in outputs:
- outputs[key] = []
- outputs[key].append(sample[key])
-
- for key, data_list in outputs.items():
- if isinstance(data_list[0], torch.Tensor):
- if data_list[0].dtype == torch.int64:
-
- pad_value = self.int_pad_value
- else:
- pad_value = self.float_pad_value
-
- outputs[key] = torch.nn.utils.rnn.pad_sequence(data_list, batch_first=True, padding_value=pad_value)
- return outputs
|