rwkv_subsampling.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2019 Shigeki Karita
  4. # Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
  5. """Subsampling layer definition."""
  6. import numpy as np
  7. import torch
  8. import torch.nn.functional as F
  9. from funasr.modules.embedding import PositionalEncoding
  10. import logging
  11. from funasr.modules.streaming_utils.utils import sequence_mask
  12. from funasr.modules.nets_utils import sub_factor_to_params, pad_to_len
  13. from typing import Optional, Tuple, Union
  14. import math
  15. class TooShortUttError(Exception):
  16. """Raised when the utt is too short for subsampling.
  17. Args:
  18. message (str): Message for error catch
  19. actual_size (int): the short size that cannot pass the subsampling
  20. limit (int): the limit size for subsampling
  21. """
  22. def __init__(self, message, actual_size, limit):
  23. """Construct a TooShortUttError for error handler."""
  24. super().__init__(message)
  25. self.actual_size = actual_size
  26. self.limit = limit
  27. def check_short_utt(ins, size):
  28. """Check if the utterance is too short for subsampling."""
  29. if isinstance(ins, Conv2dSubsampling2) and size < 3:
  30. return True, 3
  31. if isinstance(ins, Conv2dSubsampling) and size < 7:
  32. return True, 7
  33. if isinstance(ins, Conv2dSubsampling6) and size < 11:
  34. return True, 11
  35. if isinstance(ins, Conv2dSubsampling8) and size < 15:
  36. return True, 15
  37. return False, -1
  38. class RWKVConvInput(torch.nn.Module):
  39. """Streaming ConvInput module definition.
  40. Args:
  41. input_size: Input size.
  42. conv_size: Convolution size.
  43. subsampling_factor: Subsampling factor.
  44. output_size: Block output dimension.
  45. """
  46. def __init__(
  47. self,
  48. input_size: int,
  49. conv_size: Union[int, Tuple],
  50. subsampling_factor: int = 4,
  51. conv_kernel_size: int = 3,
  52. output_size: Optional[int] = None,
  53. ) -> None:
  54. """Construct a ConvInput object."""
  55. super().__init__()
  56. if subsampling_factor == 1:
  57. conv_size1, conv_size2, conv_size3 = conv_size
  58. self.conv = torch.nn.Sequential(
  59. torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  60. torch.nn.ReLU(),
  61. torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2),
  62. torch.nn.ReLU(),
  63. torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  64. torch.nn.ReLU(),
  65. torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2),
  66. torch.nn.ReLU(),
  67. torch.nn.Conv2d(conv_size2, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  68. torch.nn.ReLU(),
  69. torch.nn.Conv2d(conv_size3, conv_size3, conv_kernel_size, stride=[1, 2], padding=(conv_kernel_size-1)//2),
  70. torch.nn.ReLU(),
  71. )
  72. output_proj = conv_size3 * ((input_size // 2) // 2)
  73. self.subsampling_factor = 1
  74. self.stride_1 = 1
  75. self.create_new_mask = self.create_new_vgg_mask
  76. else:
  77. conv_size1, conv_size2, conv_size3 = conv_size
  78. kernel_1 = int(subsampling_factor / 2)
  79. self.conv = torch.nn.Sequential(
  80. torch.nn.Conv2d(1, conv_size1, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  81. torch.nn.ReLU(),
  82. torch.nn.Conv2d(conv_size1, conv_size1, conv_kernel_size, stride=[kernel_1, 2], padding=(conv_kernel_size-1)//2),
  83. torch.nn.ReLU(),
  84. torch.nn.Conv2d(conv_size1, conv_size2, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  85. torch.nn.ReLU(),
  86. torch.nn.Conv2d(conv_size2, conv_size2, conv_kernel_size, stride=[2, 2], padding=(conv_kernel_size-1)//2),
  87. torch.nn.ReLU(),
  88. torch.nn.Conv2d(conv_size2, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  89. torch.nn.ReLU(),
  90. torch.nn.Conv2d(conv_size3, conv_size3, conv_kernel_size, stride=1, padding=(conv_kernel_size-1)//2),
  91. torch.nn.ReLU(),
  92. )
  93. output_proj = conv_size3 * ((input_size // 2) // 2)
  94. self.subsampling_factor = subsampling_factor
  95. self.create_new_mask = self.create_new_vgg_mask
  96. self.stride_1 = kernel_1
  97. self.min_frame_length = 7
  98. if output_size is not None:
  99. self.output = torch.nn.Linear(output_proj, output_size)
  100. self.output_size = output_size
  101. else:
  102. self.output = None
  103. self.output_size = output_proj
  104. def forward(
  105. self, x: torch.Tensor, mask: Optional[torch.Tensor], chunk_size: Optional[torch.Tensor]
  106. ) -> Tuple[torch.Tensor, torch.Tensor]:
  107. """Encode input sequences.
  108. Args:
  109. x: ConvInput input sequences. (B, T, D_feats)
  110. mask: Mask of input sequences. (B, 1, T)
  111. Returns:
  112. x: ConvInput output sequences. (B, sub(T), D_out)
  113. mask: Mask of output sequences. (B, 1, sub(T))
  114. """
  115. if mask is not None:
  116. mask = self.create_new_mask(mask)
  117. olens = max(mask.eq(0).sum(1))
  118. b, t, f = x.size()
  119. x = x.unsqueeze(1) # (b. 1. t. f)
  120. if chunk_size is not None:
  121. max_input_length = int(
  122. chunk_size * self.subsampling_factor * (math.ceil(float(t) / (chunk_size * self.subsampling_factor) ))
  123. )
  124. x = map(lambda inputs: pad_to_len(inputs, max_input_length, 1), x)
  125. x = list(x)
  126. x = torch.stack(x, dim=0)
  127. N_chunks = max_input_length // ( chunk_size * self.subsampling_factor)
  128. x = x.view(b * N_chunks, 1, chunk_size * self.subsampling_factor, f)
  129. x = self.conv(x)
  130. _, c, _, f = x.size()
  131. if chunk_size is not None:
  132. x = x.transpose(1, 2).contiguous().view(b, -1, c * f)[:,:olens,:]
  133. else:
  134. x = x.transpose(1, 2).contiguous().view(b, -1, c * f)
  135. if self.output is not None:
  136. x = self.output(x)
  137. return x, mask[:,:olens][:,:x.size(1)]
  138. def create_new_vgg_mask(self, mask: torch.Tensor) -> torch.Tensor:
  139. """Create a new mask for VGG output sequences.
  140. Args:
  141. mask: Mask of input sequences. (B, T)
  142. Returns:
  143. mask: Mask of output sequences. (B, sub(T))
  144. """
  145. if self.subsampling_factor > 1:
  146. return mask[:, ::2][:, ::self.stride_1]
  147. else:
  148. return mask
  149. def get_size_before_subsampling(self, size: int) -> int:
  150. """Return the original size before subsampling for a given size.
  151. Args:
  152. size: Number of frames after subsampling.
  153. Returns:
  154. : Number of frames before subsampling.
  155. """
  156. return size * self.subsampling_factor