| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- import os
- import torch
- import torch.nn as nn
- from funasr.modules.attention import MultiHeadedAttention
- from funasr.export.models.modules.decoder_layer import DecoderLayer as OnnxDecoderLayer
- from funasr.export.models.language_models.embed import Embedding
- from funasr.export.models.modules.multihead_att import \
- OnnxMultiHeadedAttention
- from funasr.export.utils.torch_function import MakePadMask, subsequent_mask
- class XformerDecoder(nn.Module):
- def __init__(self,
- model,
- max_seq_len = 512,
- model_name = 'decoder',
- onnx: bool = True,):
- super().__init__()
- self.embed = Embedding(model.embed, max_seq_len)
- self.model = model
- if onnx:
- self.make_pad_mask = MakePadMask(max_seq_len, flip=False)
- else:
- self.make_pad_mask = subsequent_mask(max_seq_len, flip=False)
- if isinstance(self.model.decoders[0].self_attn, MultiHeadedAttention):
- self.num_heads = self.model.decoders[0].self_attn.h
- self.hidden_size = self.model.decoders[0].self_attn.linear_out.out_features
- # replace multi-head attention module into customized module.
- for i, d in enumerate(self.model.decoders):
- # d is DecoderLayer
- if isinstance(d.self_attn, MultiHeadedAttention):
- d.self_attn = OnnxMultiHeadedAttention(d.self_attn)
- if isinstance(d.src_attn, MultiHeadedAttention):
- d.src_attn = OnnxMultiHeadedAttention(d.src_attn)
- self.model.decoders[i] = OnnxDecoderLayer(d)
- self.model_name = model_name
- def prepare_mask(self, mask):
- mask_3d_btd = mask[:, :, None]
- if len(mask.shape) == 2:
- mask_4d_bhlt = 1 - mask[:, None, None, :]
- elif len(mask.shape) == 3:
- mask_4d_bhlt = 1 - mask[:, None, :]
- mask_4d_bhlt = mask_4d_bhlt * -10000.0
- return mask_3d_btd, mask_4d_bhlt
- def forward(self,
- tgt,
- memory,
- cache):
- mask = subsequent_mask(tgt.size(-1)).unsqueeze(0) # (B, T)
- x = self.embed(tgt)
- mask = self.prepare_mask(mask)
- new_cache = []
- for c, decoder in zip(cache, self.model.decoders):
- x, mask = decoder(x, mask, memory, None, c)
- new_cache.append(x)
- x = x[:, 1:, :]
- if self.model.normalize_before:
- y = self.model.after_norm(x[:, -1])
- else:
- y = x[:, -1]
- if self.model.output_layer is not None:
- y = torch.log_softmax(self.model.output_layer(y), dim=-1)
- return y, new_cache
- def get_dummy_inputs(self, enc_size):
- tgt = torch.LongTensor([0]).unsqueeze(0)
- memory = torch.randn(1, 100, enc_size)
- cache_num = len(self.model.decoders)
- cache = [
- torch.zeros((1, 1, self.model.decoders[0].size))
- for _ in range(cache_num)
- ]
- return (tgt, memory, cache)
- def is_optimizable(self):
- return True
- def get_input_names(self):
- cache_num = len(self.model.decoders)
- return ["tgt", "memory"] + [
- "cache_%d" % i for i in range(cache_num)
- ]
- def get_output_names(self):
- cache_num = len(self.model.decoders)
- return ["y"] + ["out_cache_%d" % i for i in range(cache_num)]
- def get_dynamic_axes(self):
- ret = {
- "tgt": {0: "tgt_batch", 1: "tgt_length"},
- "memory": {0: "memory_batch", 1: "memory_length"},
- }
- cache_num = len(self.model.decoders)
- ret.update(
- {
- "cache_%d" % d: {0: "cache_%d_batch" % d, 2: "cache_%d_length" % d}
- for d in range(cache_num)
- }
- )
- return ret
- def get_model_config(self, path):
- return {
- "dec_type": "XformerDecoder",
- "model_path": os.path.join(path, f"{self.model_name}.onnx"),
- "n_layers": len(self.model.decoders),
- "odim": self.model.decoders[0].size,
- }
|