sanm_encoder.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import torch
  2. import torch.nn as nn
  3. from funasr.export.utils.torch_function import MakePadMask
  4. from funasr.export.utils.torch_function import sequence_mask
  5. from funasr.modules.attention import MultiHeadedAttentionSANM
  6. from funasr.export.models.modules.multihead_att import MultiHeadedAttentionSANM as MultiHeadedAttentionSANM_export
  7. from funasr.export.models.modules.encoder_layer import EncoderLayerSANM as EncoderLayerSANM_export
  8. from funasr.modules.positionwise_feed_forward import PositionwiseFeedForward
  9. from funasr.export.models.modules.feedforward import PositionwiseFeedForward as PositionwiseFeedForward_export
  10. class SANMEncoder(nn.Module):
  11. def __init__(
  12. self,
  13. model,
  14. max_seq_len=512,
  15. feats_dim=560,
  16. model_name='encoder',
  17. onnx: bool = True,
  18. ):
  19. super().__init__()
  20. self.embed = model.embed
  21. self.model = model
  22. self.feats_dim = feats_dim
  23. self._output_size = model._output_size
  24. if onnx:
  25. self.make_pad_mask = MakePadMask(max_seq_len, flip=False)
  26. else:
  27. self.make_pad_mask = sequence_mask(max_seq_len, flip=False)
  28. if hasattr(model, 'encoders0'):
  29. for i, d in enumerate(self.model.encoders0):
  30. if isinstance(d.self_attn, MultiHeadedAttentionSANM):
  31. d.self_attn = MultiHeadedAttentionSANM_export(d.self_attn)
  32. if isinstance(d.feed_forward, PositionwiseFeedForward):
  33. d.feed_forward = PositionwiseFeedForward_export(d.feed_forward)
  34. self.model.encoders0[i] = EncoderLayerSANM_export(d)
  35. for i, d in enumerate(self.model.encoders):
  36. if isinstance(d.self_attn, MultiHeadedAttentionSANM):
  37. d.self_attn = MultiHeadedAttentionSANM_export(d.self_attn)
  38. if isinstance(d.feed_forward, PositionwiseFeedForward):
  39. d.feed_forward = PositionwiseFeedForward_export(d.feed_forward)
  40. self.model.encoders[i] = EncoderLayerSANM_export(d)
  41. self.model_name = model_name
  42. self.num_heads = model.encoders[0].self_attn.h
  43. self.hidden_size = model.encoders[0].self_attn.linear_out.out_features
  44. def prepare_mask(self, mask):
  45. mask_3d_btd = mask[:, :, None]
  46. if len(mask.shape) == 2:
  47. mask_4d_bhlt = 1 - mask[:, None, None, :]
  48. elif len(mask.shape) == 3:
  49. mask_4d_bhlt = 1 - mask[:, None, :]
  50. mask_4d_bhlt = mask_4d_bhlt * -10000.0
  51. return mask_3d_btd, mask_4d_bhlt
  52. def forward(self,
  53. speech: torch.Tensor,
  54. speech_lengths: torch.Tensor,
  55. ):
  56. speech = speech * self._output_size ** 0.5
  57. mask = self.make_pad_mask(speech_lengths)
  58. mask = self.prepare_mask(mask)
  59. if self.embed is None:
  60. xs_pad = speech
  61. else:
  62. xs_pad = self.embed(speech)
  63. # xs_pad = xs_pad / scale
  64. encoder_outs = self.model.encoders0(xs_pad, mask)
  65. xs_pad, masks = encoder_outs[0], encoder_outs[1]
  66. encoder_outs = self.model.encoders(xs_pad, mask)
  67. xs_pad, masks = encoder_outs[0], encoder_outs[1]
  68. xs_pad = self.model.after_norm(xs_pad)
  69. return xs_pad, speech_lengths
  70. def get_output_size(self):
  71. return self.model.encoders[0].size
  72. def get_dummy_inputs(self):
  73. feats = torch.randn(1, 100, self.feats_dim)
  74. return (feats)
  75. def get_input_names(self):
  76. return ['feats']
  77. def get_output_names(self):
  78. return ['encoder_out', 'encoder_out_lens', 'predictor_weight']
  79. def get_dynamic_axes(self):
  80. return {
  81. 'feats': {
  82. 1: 'feats_length'
  83. },
  84. 'encoder_out': {
  85. 1: 'enc_out_length'
  86. },
  87. 'predictor_weight':{
  88. 1: 'pre_out_length'
  89. }
  90. }
  91. class SANMVadEncoder(nn.Module):
  92. def __init__(
  93. self,
  94. model,
  95. max_seq_len=512,
  96. feats_dim=560,
  97. model_name='encoder',
  98. onnx: bool = True,
  99. ):
  100. super().__init__()
  101. self.embed = model.embed
  102. self.model = model
  103. self.feats_dim = feats_dim
  104. self._output_size = model._output_size
  105. if onnx:
  106. self.make_pad_mask = MakePadMask(max_seq_len, flip=False)
  107. else:
  108. self.make_pad_mask = sequence_mask(max_seq_len, flip=False)
  109. if hasattr(model, 'encoders0'):
  110. for i, d in enumerate(self.model.encoders0):
  111. if isinstance(d.self_attn, MultiHeadedAttentionSANM):
  112. d.self_attn = MultiHeadedAttentionSANM_export(d.self_attn)
  113. if isinstance(d.feed_forward, PositionwiseFeedForward):
  114. d.feed_forward = PositionwiseFeedForward_export(d.feed_forward)
  115. self.model.encoders0[i] = EncoderLayerSANM_export(d)
  116. for i, d in enumerate(self.model.encoders):
  117. if isinstance(d.self_attn, MultiHeadedAttentionSANM):
  118. d.self_attn = MultiHeadedAttentionSANM_export(d.self_attn)
  119. if isinstance(d.feed_forward, PositionwiseFeedForward):
  120. d.feed_forward = PositionwiseFeedForward_export(d.feed_forward)
  121. self.model.encoders[i] = EncoderLayerSANM_export(d)
  122. self.model_name = model_name
  123. self.num_heads = model.encoders[0].self_attn.h
  124. self.hidden_size = model.encoders[0].self_attn.linear_out.out_features
  125. def prepare_mask(self, mask, sub_masks):
  126. mask_3d_btd = mask[:, :, None]
  127. mask_4d_bhlt = (1 - sub_masks) * -10000.0
  128. return mask_3d_btd, mask_4d_bhlt
  129. def forward(self,
  130. speech: torch.Tensor,
  131. speech_lengths: torch.Tensor,
  132. vad_masks: torch.Tensor,
  133. sub_masks: torch.Tensor,
  134. ):
  135. speech = speech * self._output_size ** 0.5
  136. mask = self.make_pad_mask(speech_lengths)
  137. vad_masks = self.prepare_mask(mask, vad_masks)
  138. mask = self.prepare_mask(mask, sub_masks)
  139. if self.embed is None:
  140. xs_pad = speech
  141. else:
  142. xs_pad = self.embed(speech)
  143. encoder_outs = self.model.encoders0(xs_pad, mask)
  144. xs_pad, masks = encoder_outs[0], encoder_outs[1]
  145. # encoder_outs = self.model.encoders(xs_pad, mask)
  146. for layer_idx, encoder_layer in enumerate(self.model.encoders):
  147. if layer_idx == len(self.model.encoders) - 1:
  148. mask = vad_masks
  149. encoder_outs = encoder_layer(xs_pad, mask)
  150. xs_pad, masks = encoder_outs[0], encoder_outs[1]
  151. xs_pad = self.model.after_norm(xs_pad)
  152. return xs_pad, speech_lengths
  153. def get_output_size(self):
  154. return self.model.encoders[0].size
  155. # def get_dummy_inputs(self):
  156. # feats = torch.randn(1, 100, self.feats_dim)
  157. # return (feats)
  158. #
  159. # def get_input_names(self):
  160. # return ['feats']
  161. #
  162. # def get_output_names(self):
  163. # return ['encoder_out', 'encoder_out_lens', 'predictor_weight']
  164. #
  165. # def get_dynamic_axes(self):
  166. # return {
  167. # 'feats': {
  168. # 1: 'feats_length'
  169. # },
  170. # 'encoder_out': {
  171. # 1: 'enc_out_length'
  172. # },
  173. # 'predictor_weight': {
  174. # 1: 'pre_out_length'
  175. # }
  176. #
  177. # }