| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- from typing import List
- from typing import Tuple
- import torch
- import torch.nn as nn
- from funasr.modules.streaming_utils import utils as myutils
- from funasr.models.decoder.transformer_decoder import BaseTransformerDecoder
- from typeguard import check_argument_types
- from funasr.modules.attention import MultiHeadedAttentionSANMDecoder, MultiHeadedAttentionCrossAtt
- from funasr.modules.embedding import PositionalEncoding
- from funasr.modules.layer_norm import LayerNorm
- from funasr.modules.positionwise_feed_forward import PositionwiseFeedForwardDecoderSANM
- from funasr.modules.repeat import repeat
- class DecoderLayerSANM(nn.Module):
- """Single decoder layer module.
- Args:
- size (int): Input dimension.
- self_attn (torch.nn.Module): Self-attention module instance.
- `MultiHeadedAttention` instance can be used as the argument.
- src_attn (torch.nn.Module): Self-attention module instance.
- `MultiHeadedAttention` instance can be used as the argument.
- feed_forward (torch.nn.Module): Feed-forward module instance.
- `PositionwiseFeedForward`, `MultiLayeredConv1d`, or `Conv1dLinear` instance
- can be used as the argument.
- dropout_rate (float): Dropout rate.
- normalize_before (bool): Whether to use layer_norm before the first block.
- concat_after (bool): Whether to concat attention layer's input and output.
- if True, additional linear will be applied.
- i.e. x -> x + linear(concat(x, att(x)))
- if False, no additional linear will be applied. i.e. x -> x + att(x)
- """
- def __init__(
- self,
- size,
- self_attn,
- src_attn,
- feed_forward,
- dropout_rate,
- normalize_before=True,
- concat_after=False,
- ):
- """Construct an DecoderLayer object."""
- super(DecoderLayerSANM, self).__init__()
- self.size = size
- self.self_attn = self_attn
- self.src_attn = src_attn
- self.feed_forward = feed_forward
- self.norm1 = LayerNorm(size)
- if self_attn is not None:
- self.norm2 = LayerNorm(size)
- if src_attn is not None:
- self.norm3 = LayerNorm(size)
- self.dropout = nn.Dropout(dropout_rate)
- self.normalize_before = normalize_before
- self.concat_after = concat_after
- if self.concat_after:
- self.concat_linear1 = nn.Linear(size + size, size)
- self.concat_linear2 = nn.Linear(size + size, size)
- def forward(self, tgt, tgt_mask, memory, memory_mask=None, cache=None):
- """Compute decoded features.
- Args:
- tgt (torch.Tensor): Input tensor (#batch, maxlen_out, size).
- tgt_mask (torch.Tensor): Mask for input tensor (#batch, maxlen_out).
- memory (torch.Tensor): Encoded memory, float32 (#batch, maxlen_in, size).
- memory_mask (torch.Tensor): Encoded memory mask (#batch, maxlen_in).
- cache (List[torch.Tensor]): List of cached tensors.
- Each tensor shape should be (#batch, maxlen_out - 1, size).
- Returns:
- torch.Tensor: Output tensor(#batch, maxlen_out, size).
- torch.Tensor: Mask for output tensor (#batch, maxlen_out).
- torch.Tensor: Encoded memory (#batch, maxlen_in, size).
- torch.Tensor: Encoded memory mask (#batch, maxlen_in).
- """
- # tgt = self.dropout(tgt)
- residual = tgt
- if self.normalize_before:
- tgt = self.norm1(tgt)
- tgt = self.feed_forward(tgt)
- x = tgt
- if self.self_attn:
- if self.normalize_before:
- tgt = self.norm2(tgt)
- if self.training:
- cache = None
- x, cache = self.self_attn(tgt, tgt_mask, cache=cache)
- x = residual + self.dropout(x)
- if self.src_attn is not None:
- residual = x
- if self.normalize_before:
- x = self.norm3(x)
- x = residual + self.dropout(self.src_attn(x, memory, memory_mask))
- return x, tgt_mask, memory, memory_mask, cache
- class FsmnDecoderSCAMAOpt(BaseTransformerDecoder):
- """
- author: Speech Lab, Alibaba Group, China
- SCAMA: Streaming chunk-aware multihead attention for online end-to-end speech recognition
- https://arxiv.org/abs/2006.01713
- """
- def __init__(
- self,
- vocab_size: int,
- encoder_output_size: int,
- attention_heads: int = 4,
- linear_units: int = 2048,
- num_blocks: int = 6,
- dropout_rate: float = 0.1,
- positional_dropout_rate: float = 0.1,
- self_attention_dropout_rate: float = 0.0,
- src_attention_dropout_rate: float = 0.0,
- input_layer: str = "embed",
- use_output_layer: bool = True,
- pos_enc_class=PositionalEncoding,
- normalize_before: bool = True,
- concat_after: bool = False,
- att_layer_num: int = 6,
- kernel_size: int = 21,
- sanm_shfit: int = None,
- concat_embeds: bool = False,
- attention_dim: int = None,
- ):
- assert check_argument_types()
- super().__init__(
- vocab_size=vocab_size,
- encoder_output_size=encoder_output_size,
- dropout_rate=dropout_rate,
- positional_dropout_rate=positional_dropout_rate,
- input_layer=input_layer,
- use_output_layer=use_output_layer,
- pos_enc_class=pos_enc_class,
- normalize_before=normalize_before,
- )
- if attention_dim is None:
- attention_dim = encoder_output_size
- if input_layer == "embed":
- self.embed = torch.nn.Sequential(
- torch.nn.Embedding(vocab_size, attention_dim),
- )
- elif input_layer == "linear":
- self.embed = torch.nn.Sequential(
- torch.nn.Linear(vocab_size, attention_dim),
- torch.nn.LayerNorm(attention_dim),
- torch.nn.Dropout(dropout_rate),
- torch.nn.ReLU(),
- pos_enc_class(attention_dim, positional_dropout_rate),
- )
- else:
- raise ValueError(f"only 'embed' or 'linear' is supported: {input_layer}")
- self.normalize_before = normalize_before
- if self.normalize_before:
- self.after_norm = LayerNorm(attention_dim)
- if use_output_layer:
- self.output_layer = torch.nn.Linear(attention_dim, vocab_size)
- else:
- self.output_layer = None
- self.att_layer_num = att_layer_num
- self.num_blocks = num_blocks
- if sanm_shfit is None:
- sanm_shfit = (kernel_size - 1) // 2
- self.decoders = repeat(
- att_layer_num,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- MultiHeadedAttentionSANMDecoder(
- attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=sanm_shfit
- ),
- MultiHeadedAttentionCrossAtt(
- attention_heads, attention_dim, src_attention_dropout_rate, encoder_output_size=encoder_output_size
- ),
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- if num_blocks - att_layer_num <= 0:
- self.decoders2 = None
- else:
- self.decoders2 = repeat(
- num_blocks - att_layer_num,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- MultiHeadedAttentionSANMDecoder(
- attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=sanm_shfit
- ),
- None,
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- self.decoders3 = repeat(
- 1,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- None,
- None,
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- if concat_embeds:
- self.embed_concat_ffn = repeat(
- 1,
- lambda lnum: DecoderLayerSANM(
- attention_dim + encoder_output_size,
- None,
- None,
- PositionwiseFeedForwardDecoderSANM(attention_dim + encoder_output_size, linear_units, dropout_rate,
- adim=attention_dim),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- else:
- self.embed_concat_ffn = None
- self.concat_embeds = concat_embeds
- def forward(
- self,
- hs_pad: torch.Tensor,
- hlens: torch.Tensor,
- ys_in_pad: torch.Tensor,
- ys_in_lens: torch.Tensor,
- chunk_mask: torch.Tensor = None,
- pre_acoustic_embeds: torch.Tensor = None,
- ) -> Tuple[torch.Tensor, torch.Tensor]:
- """Forward decoder.
- Args:
- hs_pad: encoded memory, float32 (batch, maxlen_in, feat)
- hlens: (batch)
- ys_in_pad:
- input token ids, int64 (batch, maxlen_out)
- if input_layer == "embed"
- input tensor (batch, maxlen_out, #mels) in the other cases
- ys_in_lens: (batch)
- Returns:
- (tuple): tuple containing:
- x: decoded token score before softmax (batch, maxlen_out, token)
- if use_output_layer is True,
- olens: (batch, )
- """
- tgt = ys_in_pad
- tgt_mask = myutils.sequence_mask(ys_in_lens, device=tgt.device)[:, :, None]
- memory = hs_pad
- memory_mask = myutils.sequence_mask(hlens, device=memory.device)[:, None, :]
- if chunk_mask is not None:
- memory_mask = memory_mask * chunk_mask
- if tgt_mask.size(1) != memory_mask.size(1):
- memory_mask = torch.cat((memory_mask, memory_mask[:, -2:-1, :]), dim=1)
- x = self.embed(tgt)
- if pre_acoustic_embeds is not None and self.concat_embeds:
- x = torch.cat((x, pre_acoustic_embeds), dim=-1)
- x, _, _, _, _ = self.embed_concat_ffn(x, None, None, None, None)
- x, tgt_mask, memory, memory_mask, _ = self.decoders(
- x, tgt_mask, memory, memory_mask
- )
- if self.decoders2 is not None:
- x, tgt_mask, memory, memory_mask, _ = self.decoders2(
- x, tgt_mask, memory, memory_mask
- )
- x, tgt_mask, memory, memory_mask, _ = self.decoders3(
- x, tgt_mask, memory, memory_mask
- )
- if self.normalize_before:
- x = self.after_norm(x)
- if self.output_layer is not None:
- x = self.output_layer(x)
- olens = tgt_mask.sum(1)
- return x, olens
- def score(self, ys, state, x, x_mask=None, pre_acoustic_embeds: torch.Tensor = None, ):
- """Score."""
- ys_mask = myutils.sequence_mask(torch.tensor([len(ys)], dtype=torch.int32), device=x.device)[:, :, None]
- logp, state = self.forward_one_step(
- ys.unsqueeze(0), ys_mask, x.unsqueeze(0), memory_mask=x_mask, pre_acoustic_embeds=pre_acoustic_embeds,
- cache=state
- )
- return logp.squeeze(0), state
- def forward_one_step(
- self,
- tgt: torch.Tensor,
- tgt_mask: torch.Tensor,
- memory: torch.Tensor,
- memory_mask: torch.Tensor = None,
- pre_acoustic_embeds: torch.Tensor = None,
- cache: List[torch.Tensor] = None,
- ) -> Tuple[torch.Tensor, List[torch.Tensor]]:
- """Forward one step.
- Args:
- tgt: input token ids, int64 (batch, maxlen_out)
- tgt_mask: input token mask, (batch, maxlen_out)
- dtype=torch.uint8 in PyTorch 1.2-
- dtype=torch.bool in PyTorch 1.2+ (include 1.2)
- memory: encoded memory, float32 (batch, maxlen_in, feat)
- cache: cached output list of (batch, max_time_out-1, size)
- Returns:
- y, cache: NN output value and cache per `self.decoders`.
- y.shape` is (batch, maxlen_out, token)
- """
- x = tgt[:, -1:]
- tgt_mask = None
- x = self.embed(x)
- if pre_acoustic_embeds is not None and self.concat_embeds:
- x = torch.cat((x, pre_acoustic_embeds), dim=-1)
- x, _, _, _, _ = self.embed_concat_ffn(x, None, None, None, None)
- if cache is None:
- cache_layer_num = len(self.decoders)
- if self.decoders2 is not None:
- cache_layer_num += len(self.decoders2)
- cache = [None] * cache_layer_num
- new_cache = []
- # for c, decoder in zip(cache, self.decoders):
- for i in range(self.att_layer_num):
- decoder = self.decoders[i]
- c = cache[i]
- x, tgt_mask, memory, memory_mask, c_ret = decoder(
- x, tgt_mask, memory, memory_mask, cache=c
- )
- new_cache.append(c_ret)
- if self.num_blocks - self.att_layer_num >= 1:
- for i in range(self.num_blocks - self.att_layer_num):
- j = i + self.att_layer_num
- decoder = self.decoders2[i]
- c = cache[j]
- x, tgt_mask, memory, memory_mask, c_ret = decoder(
- x, tgt_mask, memory, memory_mask, cache=c
- )
- new_cache.append(c_ret)
- for decoder in self.decoders3:
- x, tgt_mask, memory, memory_mask, _ = decoder(
- x, tgt_mask, memory, None, cache=None
- )
- if self.normalize_before:
- y = self.after_norm(x[:, -1])
- else:
- y = x[:, -1]
- if self.output_layer is not None:
- y = self.output_layer(y)
- y = torch.log_softmax(y, dim=-1)
- return y, new_cache
- class ParaformerSANMDecoder(BaseTransformerDecoder):
- """
- author: Speech Lab, Alibaba Group, China
- Paraformer: Fast and Accurate Parallel Transformer for Non-autoregressive End-to-End Speech Recognition
- https://arxiv.org/abs/2006.01713
- """
- def __init__(
- self,
- vocab_size: int,
- encoder_output_size: int,
- attention_heads: int = 4,
- linear_units: int = 2048,
- num_blocks: int = 6,
- dropout_rate: float = 0.1,
- positional_dropout_rate: float = 0.1,
- self_attention_dropout_rate: float = 0.0,
- src_attention_dropout_rate: float = 0.0,
- input_layer: str = "embed",
- use_output_layer: bool = True,
- pos_enc_class=PositionalEncoding,
- normalize_before: bool = True,
- concat_after: bool = False,
- att_layer_num: int = 6,
- kernel_size: int = 21,
- sanm_shfit: int = 0,
- ):
- assert check_argument_types()
- super().__init__(
- vocab_size=vocab_size,
- encoder_output_size=encoder_output_size,
- dropout_rate=dropout_rate,
- positional_dropout_rate=positional_dropout_rate,
- input_layer=input_layer,
- use_output_layer=use_output_layer,
- pos_enc_class=pos_enc_class,
- normalize_before=normalize_before,
- )
- attention_dim = encoder_output_size
- if input_layer == "embed":
- self.embed = torch.nn.Sequential(
- torch.nn.Embedding(vocab_size, attention_dim),
- # pos_enc_class(attention_dim, positional_dropout_rate),
- )
- elif input_layer == "linear":
- self.embed = torch.nn.Sequential(
- torch.nn.Linear(vocab_size, attention_dim),
- torch.nn.LayerNorm(attention_dim),
- torch.nn.Dropout(dropout_rate),
- torch.nn.ReLU(),
- pos_enc_class(attention_dim, positional_dropout_rate),
- )
- else:
- raise ValueError(f"only 'embed' or 'linear' is supported: {input_layer}")
- self.normalize_before = normalize_before
- if self.normalize_before:
- self.after_norm = LayerNorm(attention_dim)
- if use_output_layer:
- self.output_layer = torch.nn.Linear(attention_dim, vocab_size)
- else:
- self.output_layer = None
- self.att_layer_num = att_layer_num
- self.num_blocks = num_blocks
- if sanm_shfit is None:
- sanm_shfit = (kernel_size - 1) // 2
- self.decoders = repeat(
- att_layer_num,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- MultiHeadedAttentionSANMDecoder(
- attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=sanm_shfit
- ),
- MultiHeadedAttentionCrossAtt(
- attention_heads, attention_dim, src_attention_dropout_rate
- ),
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- if num_blocks - att_layer_num <= 0:
- self.decoders2 = None
- else:
- self.decoders2 = repeat(
- num_blocks - att_layer_num,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- MultiHeadedAttentionSANMDecoder(
- attention_dim, self_attention_dropout_rate, kernel_size, sanm_shfit=0
- ),
- None,
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- self.decoders3 = repeat(
- 1,
- lambda lnum: DecoderLayerSANM(
- attention_dim,
- None,
- None,
- PositionwiseFeedForwardDecoderSANM(attention_dim, linear_units, dropout_rate),
- dropout_rate,
- normalize_before,
- concat_after,
- ),
- )
- def forward(
- self,
- hs_pad: torch.Tensor,
- hlens: torch.Tensor,
- ys_in_pad: torch.Tensor,
- ys_in_lens: torch.Tensor,
- ) -> Tuple[torch.Tensor, torch.Tensor]:
- """Forward decoder.
- Args:
- hs_pad: encoded memory, float32 (batch, maxlen_in, feat)
- hlens: (batch)
- ys_in_pad:
- input token ids, int64 (batch, maxlen_out)
- if input_layer == "embed"
- input tensor (batch, maxlen_out, #mels) in the other cases
- ys_in_lens: (batch)
- Returns:
- (tuple): tuple containing:
- x: decoded token score before softmax (batch, maxlen_out, token)
- if use_output_layer is True,
- olens: (batch, )
- """
- tgt = ys_in_pad
- tgt_mask = myutils.sequence_mask(ys_in_lens, device=tgt.device)[:, :, None]
- memory = hs_pad
- memory_mask = myutils.sequence_mask(hlens, device=memory.device)[:, None, :]
- x = tgt
- x, tgt_mask, memory, memory_mask, _ = self.decoders(
- x, tgt_mask, memory, memory_mask
- )
- if self.decoders2 is not None:
- x, tgt_mask, memory, memory_mask, _ = self.decoders2(
- x, tgt_mask, memory, memory_mask
- )
- x, tgt_mask, memory, memory_mask, _ = self.decoders3(
- x, tgt_mask, memory, memory_mask
- )
- if self.normalize_before:
- x = self.after_norm(x)
- if self.output_layer is not None:
- x = self.output_layer(x)
- olens = tgt_mask.sum(1)
- return x, olens
- def score(self, ys, state, x):
- """Score."""
- ys_mask = myutils.sequence_mask(torch.tensor([len(ys)], dtype=torch.int32), device=x.device)[:, :, None]
- logp, state = self.forward_one_step(
- ys.unsqueeze(0), ys_mask, x.unsqueeze(0), cache=state
- )
- return logp.squeeze(0), state
- def forward_one_step(
- self,
- tgt: torch.Tensor,
- tgt_mask: torch.Tensor,
- memory: torch.Tensor,
- cache: List[torch.Tensor] = None,
- ) -> Tuple[torch.Tensor, List[torch.Tensor]]:
- """Forward one step.
- Args:
- tgt: input token ids, int64 (batch, maxlen_out)
- tgt_mask: input token mask, (batch, maxlen_out)
- dtype=torch.uint8 in PyTorch 1.2-
- dtype=torch.bool in PyTorch 1.2+ (include 1.2)
- memory: encoded memory, float32 (batch, maxlen_in, feat)
- cache: cached output list of (batch, max_time_out-1, size)
- Returns:
- y, cache: NN output value and cache per `self.decoders`.
- y.shape` is (batch, maxlen_out, token)
- """
- x = self.embed(tgt)
- if cache is None:
- cache_layer_num = len(self.decoders)
- if self.decoders2 is not None:
- cache_layer_num += len(self.decoders2)
- cache = [None] * cache_layer_num
- new_cache = []
- # for c, decoder in zip(cache, self.decoders):
- for i in range(self.att_layer_num):
- decoder = self.decoders[i]
- c = cache[i]
- x, tgt_mask, memory, memory_mask, c_ret = decoder(
- x, tgt_mask, memory, None, cache=c
- )
- new_cache.append(c_ret)
- if self.num_blocks - self.att_layer_num > 1:
- for i in range(self.num_blocks - self.att_layer_num):
- j = i + self.att_layer_num
- decoder = self.decoders2[i]
- c = cache[j]
- x, tgt_mask, memory, memory_mask, c_ret = decoder(
- x, tgt_mask, memory, None, cache=c
- )
- new_cache.append(c_ret)
- for decoder in self.decoders3:
- x, tgt_mask, memory, memory_mask, _ = decoder(
- x, tgt_mask, memory, None, cache=None
- )
- if self.normalize_before:
- y = self.after_norm(x[:, -1])
- else:
- y = x[:, -1]
- if self.output_layer is not None:
- y = torch.log_softmax(self.output_layer(y), dim=-1)
- return y, new_cache
|