chunk_iter_factory.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import logging
  2. from typing import Any
  3. from typing import Dict
  4. from typing import Iterator
  5. from typing import List
  6. from typing import Sequence
  7. from typing import Tuple
  8. from typing import Union
  9. import numpy as np
  10. import torch
  11. from typeguard import check_argument_types
  12. from funasr.iterators.abs_iter_factory import AbsIterFactory
  13. from funasr.iterators.sequence_iter_factory import SequenceIterFactory
  14. from funasr.samplers.abs_sampler import AbsSampler
  15. class ChunkIterFactory(AbsIterFactory):
  16. """Creates chunks from a sequence
  17. Examples:
  18. >>> batches = [["id1"], ["id2"], ...]
  19. >>> batch_size = 128
  20. >>> chunk_length = 1000
  21. >>> iter_factory = ChunkIterFactory(dataset, batches, batch_size, chunk_length)
  22. >>> it = iter_factory.build_iter(epoch)
  23. >>> for ids, batch in it:
  24. ... ...
  25. - The number of mini-batches are varied in each epochs and
  26. we can't get the number in advance
  27. because IterFactory doesn't be given to the length information.
  28. - Since the first reason, "num_iters_per_epoch" can't be implemented
  29. for this iterator. Instead of it, "num_samples_per_epoch" is implemented.
  30. """
  31. def __init__(
  32. self,
  33. dataset,
  34. batch_size: int,
  35. batches: Union[AbsSampler, Sequence[Sequence[Any]]],
  36. chunk_length: Union[int, str],
  37. chunk_shift_ratio: float = 0.5,
  38. num_cache_chunks: int = 1024,
  39. num_samples_per_epoch: int = None,
  40. seed: int = 0,
  41. shuffle: bool = False,
  42. num_workers: int = 0,
  43. collate_fn=None,
  44. pin_memory: bool = False,
  45. ):
  46. assert check_argument_types()
  47. assert all(len(x) == 1 for x in batches), "batch-size must be 1"
  48. self.per_sample_iter_factory = SequenceIterFactory(
  49. dataset=dataset,
  50. batches=batches,
  51. num_iters_per_epoch=num_samples_per_epoch,
  52. seed=seed,
  53. shuffle=shuffle,
  54. num_workers=num_workers,
  55. collate_fn=collate_fn,
  56. pin_memory=pin_memory,
  57. )
  58. self.num_cache_chunks = max(num_cache_chunks, batch_size)
  59. if isinstance(chunk_length, str):
  60. if len(chunk_length) == 0:
  61. raise ValueError("e.g. 5,8 or 3-5: but got empty string")
  62. self.chunk_lengths = []
  63. for x in chunk_length.split(","):
  64. try:
  65. sps = list(map(int, x.split("-")))
  66. except ValueError:
  67. raise ValueError(f"e.g. 5,8 or 3-5: but got {chunk_length}")
  68. if len(sps) > 2:
  69. raise ValueError(f"e.g. 5,8 or 3-5: but got {chunk_length}")
  70. elif len(sps) == 2:
  71. # Append all numbers between the range into the candidates
  72. self.chunk_lengths += list(range(sps[0], sps[1] + 1))
  73. else:
  74. self.chunk_lengths += [sps[0]]
  75. else:
  76. # Single candidates: Fixed chunk length
  77. self.chunk_lengths = [chunk_length]
  78. self.chunk_shift_ratio = chunk_shift_ratio
  79. self.batch_size = batch_size
  80. self.seed = seed
  81. self.shuffle = shuffle
  82. def build_iter(
  83. self,
  84. epoch: int,
  85. shuffle: bool = None,
  86. ) -> Iterator[Tuple[List[str], Dict[str, torch.Tensor]]]:
  87. per_sample_loader = self.per_sample_iter_factory.build_iter(epoch, shuffle)
  88. if shuffle is None:
  89. shuffle = self.shuffle
  90. state = np.random.RandomState(epoch + self.seed)
  91. # NOTE(kamo):
  92. # This iterator supports multiple chunk lengths and
  93. # keep chunks for each lengths here until collecting specified numbers
  94. cache_chunks_dict = {}
  95. cache_id_list_dict = {}
  96. for ids, batch in per_sample_loader:
  97. # Must be per-sample-loader
  98. assert len(ids) == 1, f"Must be per-sample-loader: {len(ids)}"
  99. assert all(len(x) == 1 for x in batch.values())
  100. # Get keys of sequence data
  101. sequence_keys = []
  102. for key in batch:
  103. if key + "_lengths" in batch:
  104. sequence_keys.append(key)
  105. # Remove lengths data and get the first sample
  106. batch = {k: v[0] for k, v in batch.items() if not k.endswith("_lengths")}
  107. id_ = ids[0]
  108. for key in sequence_keys:
  109. if len(batch[key]) != len(batch[sequence_keys[0]]):
  110. raise RuntimeError(
  111. f"All sequences must has same length: "
  112. f"{len(batch[key])} != {len(batch[sequence_keys[0]])}"
  113. )
  114. L = len(batch[sequence_keys[0]])
  115. # Select chunk length
  116. chunk_lengths = [lg for lg in self.chunk_lengths if lg < L]
  117. if len(chunk_lengths) == 0:
  118. logging.warning(
  119. f"The length of '{id_}' is {L}, but it is shorter than "
  120. f"any candidates of chunk-length: {self.chunk_lengths}"
  121. )
  122. continue
  123. W = int(state.choice(chunk_lengths, 1))
  124. cache_id_list = cache_id_list_dict.setdefault(W, [])
  125. cache_chunks = cache_chunks_dict.setdefault(W, {})
  126. # Shift width to the next chunk
  127. S = int(W * self.chunk_shift_ratio)
  128. # Number of chunks
  129. N = (L - W) // S + 1
  130. if shuffle:
  131. Z = state.randint(0, (L - W) % S + 1)
  132. else:
  133. Z = 0
  134. # Split a sequence into chunks.
  135. # Note that the marginal frames divided by chunk length are discarded
  136. for k, v in batch.items():
  137. if k not in cache_chunks:
  138. cache_chunks[k] = []
  139. if k in sequence_keys:
  140. # Shift chunks with overlapped length for data augmentation
  141. cache_chunks[k] += [v[Z + i * S : Z + i * S + W] for i in range(N)]
  142. else:
  143. # If not sequence, use whole data instead of chunk
  144. cache_chunks[k] += [v for _ in range(N)]
  145. cache_id_list += [id_ for _ in range(N)]
  146. if len(cache_id_list) > self.num_cache_chunks:
  147. cache_id_list, cache_chunks = yield from self._generate_mini_batches(
  148. cache_id_list,
  149. cache_chunks,
  150. shuffle,
  151. state,
  152. )
  153. cache_id_list_dict[W] = cache_id_list
  154. cache_chunks_dict[W] = cache_chunks
  155. else:
  156. for W in cache_id_list_dict:
  157. cache_id_list = cache_id_list_dict.setdefault(W, [])
  158. cache_chunks = cache_chunks_dict.setdefault(W, {})
  159. yield from self._generate_mini_batches(
  160. cache_id_list,
  161. cache_chunks,
  162. shuffle,
  163. state,
  164. )
  165. def _generate_mini_batches(
  166. self,
  167. id_list: List[str],
  168. batches: Dict[str, List[torch.Tensor]],
  169. shuffle: bool,
  170. state: np.random.RandomState,
  171. ):
  172. if shuffle:
  173. indices = np.arange(0, len(id_list))
  174. state.shuffle(indices)
  175. batches = {k: [v[i] for i in indices] for k, v in batches.items()}
  176. id_list = [id_list[i] for i in indices]
  177. bs = self.batch_size
  178. while len(id_list) >= bs:
  179. # Make mini-batch and yield
  180. yield (
  181. id_list[:bs],
  182. {k: torch.stack(v[:bs], 0) for k, v in batches.items()},
  183. )
  184. id_list = id_list[bs:]
  185. batches = {k: v[bs:] for k, v in batches.items()}
  186. return id_list, batches