| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726 |
- # -*- coding: utf-8 -*-
- """Network related utility tools."""
- import logging
- from typing import Dict, List, Tuple
- import numpy as np
- import torch
- def to_device(m, x):
- """Send tensor into the device of the module.
- Args:
- m (torch.nn.Module): Torch module.
- x (Tensor): Torch tensor.
- Returns:
- Tensor: Torch tensor located in the same place as torch module.
- """
- if isinstance(m, torch.nn.Module):
- device = next(m.parameters()).device
- elif isinstance(m, torch.Tensor):
- device = m.device
- else:
- raise TypeError(
- "Expected torch.nn.Module or torch.tensor, " f"bot got: {type(m)}"
- )
- return x.to(device)
- def pad_list(xs, pad_value):
- """Perform padding for the list of tensors.
- Args:
- xs (List): List of Tensors [(T_1, `*`), (T_2, `*`), ..., (T_B, `*`)].
- pad_value (float): Value for padding.
- Returns:
- Tensor: Padded tensor (B, Tmax, `*`).
- Examples:
- >>> x = [torch.ones(4), torch.ones(2), torch.ones(1)]
- >>> x
- [tensor([1., 1., 1., 1.]), tensor([1., 1.]), tensor([1.])]
- >>> pad_list(x, 0)
- tensor([[1., 1., 1., 1.],
- [1., 1., 0., 0.],
- [1., 0., 0., 0.]])
- """
- n_batch = len(xs)
- max_len = max(x.size(0) for x in xs)
- pad = xs[0].new(n_batch, max_len, *xs[0].size()[1:]).fill_(pad_value)
- for i in range(n_batch):
- pad[i, : xs[i].size(0)] = xs[i]
- return pad
- def make_pad_mask(lengths, xs=None, length_dim=-1, maxlen=None):
- """Make mask tensor containing indices of padded part.
- Args:
- lengths (LongTensor or List): Batch of lengths (B,).
- xs (Tensor, optional): The reference tensor.
- If set, masks will be the same shape as this tensor.
- length_dim (int, optional): Dimension indicator of the above tensor.
- See the example.
- Returns:
- Tensor: Mask tensor containing indices of padded part.
- dtype=torch.uint8 in PyTorch 1.2-
- dtype=torch.bool in PyTorch 1.2+ (including 1.2)
- Examples:
- With only lengths.
- >>> lengths = [5, 3, 2]
- >>> make_pad_mask(lengths)
- masks = [[0, 0, 0, 0 ,0],
- [0, 0, 0, 1, 1],
- [0, 0, 1, 1, 1]]
- With the reference tensor.
- >>> xs = torch.zeros((3, 2, 4))
- >>> make_pad_mask(lengths, xs)
- tensor([[[0, 0, 0, 0],
- [0, 0, 0, 0]],
- [[0, 0, 0, 1],
- [0, 0, 0, 1]],
- [[0, 0, 1, 1],
- [0, 0, 1, 1]]], dtype=torch.uint8)
- >>> xs = torch.zeros((3, 2, 6))
- >>> make_pad_mask(lengths, xs)
- tensor([[[0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1]],
- [[0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1]],
- [[0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)
- With the reference tensor and dimension indicator.
- >>> xs = torch.zeros((3, 6, 6))
- >>> make_pad_mask(lengths, xs, 1)
- tensor([[[0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [1, 1, 1, 1, 1, 1]],
- [[0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1]],
- [[0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1]]], dtype=torch.uint8)
- >>> make_pad_mask(lengths, xs, 2)
- tensor([[[0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1],
- [0, 0, 0, 0, 0, 1]],
- [[0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1],
- [0, 0, 0, 1, 1, 1]],
- [[0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1],
- [0, 0, 1, 1, 1, 1]]], dtype=torch.uint8)
- """
- if length_dim == 0:
- raise ValueError("length_dim cannot be 0: {}".format(length_dim))
- if not isinstance(lengths, list):
- lengths = lengths.tolist()
- bs = int(len(lengths))
- if maxlen is None:
- if xs is None:
- maxlen = int(max(lengths))
- else:
- maxlen = xs.size(length_dim)
- else:
- assert xs is None
- assert maxlen >= int(max(lengths))
- seq_range = torch.arange(0, maxlen, dtype=torch.int64)
- seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen)
- seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1)
- mask = seq_range_expand >= seq_length_expand
- if xs is not None:
- assert xs.size(0) == bs, (xs.size(0), bs)
- if length_dim < 0:
- length_dim = xs.dim() + length_dim
- # ind = (:, None, ..., None, :, , None, ..., None)
- ind = tuple(
- slice(None) if i in (0, length_dim) else None for i in range(xs.dim())
- )
- mask = mask[ind].expand_as(xs).to(xs.device)
- return mask
- def make_non_pad_mask(lengths, xs=None, length_dim=-1):
- """Make mask tensor containing indices of non-padded part.
- Args:
- lengths (LongTensor or List): Batch of lengths (B,).
- xs (Tensor, optional): The reference tensor.
- If set, masks will be the same shape as this tensor.
- length_dim (int, optional): Dimension indicator of the above tensor.
- See the example.
- Returns:
- ByteTensor: mask tensor containing indices of padded part.
- dtype=torch.uint8 in PyTorch 1.2-
- dtype=torch.bool in PyTorch 1.2+ (including 1.2)
- Examples:
- With only lengths.
- >>> lengths = [5, 3, 2]
- >>> make_non_pad_mask(lengths)
- masks = [[1, 1, 1, 1 ,1],
- [1, 1, 1, 0, 0],
- [1, 1, 0, 0, 0]]
- With the reference tensor.
- >>> xs = torch.zeros((3, 2, 4))
- >>> make_non_pad_mask(lengths, xs)
- tensor([[[1, 1, 1, 1],
- [1, 1, 1, 1]],
- [[1, 1, 1, 0],
- [1, 1, 1, 0]],
- [[1, 1, 0, 0],
- [1, 1, 0, 0]]], dtype=torch.uint8)
- >>> xs = torch.zeros((3, 2, 6))
- >>> make_non_pad_mask(lengths, xs)
- tensor([[[1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0]],
- [[1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0]],
- [[1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)
- With the reference tensor and dimension indicator.
- >>> xs = torch.zeros((3, 6, 6))
- >>> make_non_pad_mask(lengths, xs, 1)
- tensor([[[1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [0, 0, 0, 0, 0, 0]],
- [[1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0]],
- [[1, 1, 1, 1, 1, 1],
- [1, 1, 1, 1, 1, 1],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0],
- [0, 0, 0, 0, 0, 0]]], dtype=torch.uint8)
- >>> make_non_pad_mask(lengths, xs, 2)
- tensor([[[1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0],
- [1, 1, 1, 1, 1, 0]],
- [[1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0],
- [1, 1, 1, 0, 0, 0]],
- [[1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0],
- [1, 1, 0, 0, 0, 0]]], dtype=torch.uint8)
- """
- return ~make_pad_mask(lengths, xs, length_dim)
- def mask_by_length(xs, lengths, fill=0):
- """Mask tensor according to length.
- Args:
- xs (Tensor): Batch of input tensor (B, `*`).
- lengths (LongTensor or List): Batch of lengths (B,).
- fill (int or float): Value to fill masked part.
- Returns:
- Tensor: Batch of masked input tensor (B, `*`).
- Examples:
- >>> x = torch.arange(5).repeat(3, 1) + 1
- >>> x
- tensor([[1, 2, 3, 4, 5],
- [1, 2, 3, 4, 5],
- [1, 2, 3, 4, 5]])
- >>> lengths = [5, 3, 2]
- >>> mask_by_length(x, lengths)
- tensor([[1, 2, 3, 4, 5],
- [1, 2, 3, 0, 0],
- [1, 2, 0, 0, 0]])
- """
- assert xs.size(0) == len(lengths)
- ret = xs.data.new(*xs.size()).fill_(fill)
- for i, l in enumerate(lengths):
- ret[i, :l] = xs[i, :l]
- return ret
- def th_accuracy(pad_outputs, pad_targets, ignore_label):
- """Calculate accuracy.
- Args:
- pad_outputs (Tensor): Prediction tensors (B * Lmax, D).
- pad_targets (LongTensor): Target label tensors (B, Lmax, D).
- ignore_label (int): Ignore label id.
- Returns:
- float: Accuracy value (0.0 - 1.0).
- """
- pad_pred = pad_outputs.view(
- pad_targets.size(0), pad_targets.size(1), pad_outputs.size(1)
- ).argmax(2)
- mask = pad_targets != ignore_label
- numerator = torch.sum(
- pad_pred.masked_select(mask) == pad_targets.masked_select(mask)
- )
- denominator = torch.sum(mask)
- return float(numerator) / float(denominator)
- def to_torch_tensor(x):
- """Change to torch.Tensor or ComplexTensor from numpy.ndarray.
- Args:
- x: Inputs. It should be one of numpy.ndarray, Tensor, ComplexTensor, and dict.
- Returns:
- Tensor or ComplexTensor: Type converted inputs.
- Examples:
- >>> xs = np.ones(3, dtype=np.float32)
- >>> xs = to_torch_tensor(xs)
- tensor([1., 1., 1.])
- >>> xs = torch.ones(3, 4, 5)
- >>> assert to_torch_tensor(xs) is xs
- >>> xs = {'real': xs, 'imag': xs}
- >>> to_torch_tensor(xs)
- ComplexTensor(
- Real:
- tensor([1., 1., 1.])
- Imag;
- tensor([1., 1., 1.])
- )
- """
- # If numpy, change to torch tensor
- if isinstance(x, np.ndarray):
- if x.dtype.kind == "c":
- # Dynamically importing because torch_complex requires python3
- from torch_complex.tensor import ComplexTensor
- return ComplexTensor(x)
- else:
- return torch.from_numpy(x)
- # If {'real': ..., 'imag': ...}, convert to ComplexTensor
- elif isinstance(x, dict):
- # Dynamically importing because torch_complex requires python3
- from torch_complex.tensor import ComplexTensor
- if "real" not in x or "imag" not in x:
- raise ValueError("has 'real' and 'imag' keys: {}".format(list(x)))
- # Relative importing because of using python3 syntax
- return ComplexTensor(x["real"], x["imag"])
- # If torch.Tensor, as it is
- elif isinstance(x, torch.Tensor):
- return x
- else:
- error = (
- "x must be numpy.ndarray, torch.Tensor or a dict like "
- "{{'real': torch.Tensor, 'imag': torch.Tensor}}, "
- "but got {}".format(type(x))
- )
- try:
- from torch_complex.tensor import ComplexTensor
- except Exception:
- # If PY2
- raise ValueError(error)
- else:
- # If PY3
- if isinstance(x, ComplexTensor):
- return x
- else:
- raise ValueError(error)
- def get_subsample(train_args, mode, arch):
- """Parse the subsampling factors from the args for the specified `mode` and `arch`.
- Args:
- train_args: argument Namespace containing options.
- mode: one of ('asr', 'mt', 'st')
- arch: one of ('rnn', 'rnn-t', 'rnn_mix', 'rnn_mulenc', 'transformer')
- Returns:
- np.ndarray / List[np.ndarray]: subsampling factors.
- """
- if arch == "transformer":
- return np.array([1])
- elif mode == "mt" and arch == "rnn":
- # +1 means input (+1) and layers outputs (train_args.elayer)
- subsample = np.ones(train_args.elayers + 1, dtype=np.int)
- logging.warning("Subsampling is not performed for machine translation.")
- logging.info("subsample: " + " ".join([str(x) for x in subsample]))
- return subsample
- elif (
- (mode == "asr" and arch in ("rnn", "rnn-t"))
- or (mode == "mt" and arch == "rnn")
- or (mode == "st" and arch == "rnn")
- ):
- subsample = np.ones(train_args.elayers + 1, dtype=np.int)
- if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"):
- ss = train_args.subsample.split("_")
- for j in range(min(train_args.elayers + 1, len(ss))):
- subsample[j] = int(ss[j])
- else:
- logging.warning(
- "Subsampling is not performed for vgg*. "
- "It is performed in max pooling layers at CNN."
- )
- logging.info("subsample: " + " ".join([str(x) for x in subsample]))
- return subsample
- elif mode == "asr" and arch == "rnn_mix":
- subsample = np.ones(
- train_args.elayers_sd + train_args.elayers + 1, dtype=np.int
- )
- if train_args.etype.endswith("p") and not train_args.etype.startswith("vgg"):
- ss = train_args.subsample.split("_")
- for j in range(
- min(train_args.elayers_sd + train_args.elayers + 1, len(ss))
- ):
- subsample[j] = int(ss[j])
- else:
- logging.warning(
- "Subsampling is not performed for vgg*. "
- "It is performed in max pooling layers at CNN."
- )
- logging.info("subsample: " + " ".join([str(x) for x in subsample]))
- return subsample
- elif mode == "asr" and arch == "rnn_mulenc":
- subsample_list = []
- for idx in range(train_args.num_encs):
- subsample = np.ones(train_args.elayers[idx] + 1, dtype=np.int)
- if train_args.etype[idx].endswith("p") and not train_args.etype[
- idx
- ].startswith("vgg"):
- ss = train_args.subsample[idx].split("_")
- for j in range(min(train_args.elayers[idx] + 1, len(ss))):
- subsample[j] = int(ss[j])
- else:
- logging.warning(
- "Encoder %d: Subsampling is not performed for vgg*. "
- "It is performed in max pooling layers at CNN.",
- idx + 1,
- )
- logging.info("subsample: " + " ".join([str(x) for x in subsample]))
- subsample_list.append(subsample)
- return subsample_list
- else:
- raise ValueError("Invalid options: mode={}, arch={}".format(mode, arch))
- def rename_state_dict(
- old_prefix: str, new_prefix: str, state_dict: Dict[str, torch.Tensor]
- ):
- """Replace keys of old prefix with new prefix in state dict."""
- # need this list not to break the dict iterator
- old_keys = [k for k in state_dict if k.startswith(old_prefix)]
- if len(old_keys) > 0:
- logging.warning(f"Rename: {old_prefix} -> {new_prefix}")
- for k in old_keys:
- v = state_dict.pop(k)
- new_k = k.replace(old_prefix, new_prefix)
- state_dict[new_k] = v
- class Swish(torch.nn.Module):
- """Swish activation definition.
- Swish(x) = (beta * x) * sigmoid(x)
- where beta = 1 defines standard Swish activation.
- References:
- https://arxiv.org/abs/2108.12943 / https://arxiv.org/abs/1710.05941v1.
- E-swish variant: https://arxiv.org/abs/1801.07145.
- Args:
- beta: Beta parameter for E-Swish.
- (beta >= 1. If beta < 1, use standard Swish).
- use_builtin: Whether to use PyTorch function if available.
- """
- def __init__(self, beta: float = 1.0, use_builtin: bool = False) -> None:
- super().__init__()
- self.beta = beta
- if beta > 1:
- self.swish = lambda x: (self.beta * x) * torch.sigmoid(x)
- else:
- if use_builtin:
- self.swish = torch.nn.SiLU()
- else:
- self.swish = lambda x: x * torch.sigmoid(x)
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- """Forward computation."""
- return self.swish(x)
- def get_activation(act):
- """Return activation function."""
- activation_funcs = {
- "hardtanh": torch.nn.Hardtanh,
- "tanh": torch.nn.Tanh,
- "relu": torch.nn.ReLU,
- "selu": torch.nn.SELU,
- "swish": Swish,
- }
- return activation_funcs[act]()
- class TooShortUttError(Exception):
- """Raised when the utt is too short for subsampling.
- Args:
- message: Error message to display.
- actual_size: The size that cannot pass the subsampling.
- limit: The size limit for subsampling.
- """
- def __init__(self, message: str, actual_size: int, limit: int) -> None:
- """Construct a TooShortUttError module."""
- super().__init__(message)
- self.actual_size = actual_size
- self.limit = limit
- def check_short_utt(sub_factor: int, size: int) -> Tuple[bool, int]:
- """Check if the input is too short for subsampling.
- Args:
- sub_factor: Subsampling factor for Conv2DSubsampling.
- size: Input size.
- Returns:
- : Whether an error should be sent.
- : Size limit for specified subsampling factor.
- """
- if sub_factor == 2 and size < 3:
- return True, 7
- elif sub_factor == 4 and size < 7:
- return True, 7
- elif sub_factor == 6 and size < 11:
- return True, 11
- return False, -1
- def sub_factor_to_params(sub_factor: int, input_size: int) -> Tuple[int, int, int]:
- """Get conv2D second layer parameters for given subsampling factor.
- Args:
- sub_factor: Subsampling factor (1/X).
- input_size: Input size.
- Returns:
- : Kernel size for second convolution.
- : Stride for second convolution.
- : Conv2DSubsampling output size.
- """
- if sub_factor == 2:
- return 3, 1, (((input_size - 1) // 2 - 2))
- elif sub_factor == 4:
- return 3, 2, (((input_size - 1) // 2 - 1) // 2)
- elif sub_factor == 6:
- return 5, 3, (((input_size - 1) // 2 - 2) // 3)
- else:
- raise ValueError(
- "subsampling_factor parameter should be set to either 2, 4 or 6."
- )
- def make_chunk_mask(
- size: int,
- chunk_size: int,
- left_chunk_size: int = 0,
- device: torch.device = None,
- ) -> torch.Tensor:
- """Create chunk mask for the subsequent steps (size, size).
- Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py
- Args:
- size: Size of the source mask.
- chunk_size: Number of frames in chunk.
- left_chunk_size: Size of the left context in chunks (0 means full context).
- device: Device for the mask tensor.
- Returns:
- mask: Chunk mask. (size, size)
- """
- mask = torch.zeros(size, size, device=device, dtype=torch.bool)
- for i in range(size):
- if left_chunk_size < 0:
- start = 0
- else:
- start = max((i // chunk_size - left_chunk_size) * chunk_size, 0)
- end = min((i // chunk_size + 1) * chunk_size, size)
- mask[i, start:end] = True
- return ~mask
- def make_source_mask(lengths: torch.Tensor) -> torch.Tensor:
- """Create source mask for given lengths.
- Reference: https://github.com/k2-fsa/icefall/blob/master/icefall/utils.py
- Args:
- lengths: Sequence lengths. (B,)
- Returns:
- : Mask for the sequence lengths. (B, max_len)
- """
- max_len = lengths.max()
- batch_size = lengths.size(0)
- expanded_lengths = torch.arange(max_len).expand(batch_size, max_len).to(lengths)
- return expanded_lengths >= lengths.unsqueeze(1)
- def get_transducer_task_io(
- labels: torch.Tensor,
- encoder_out_lens: torch.Tensor,
- ignore_id: int = -1,
- blank_id: int = 0,
- ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
- """Get Transducer loss I/O.
- Args:
- labels: Label ID sequences. (B, L)
- encoder_out_lens: Encoder output lengths. (B,)
- ignore_id: Padding symbol ID.
- blank_id: Blank symbol ID.
- Returns:
- decoder_in: Decoder inputs. (B, U)
- target: Target label ID sequences. (B, U)
- t_len: Time lengths. (B,)
- u_len: Label lengths. (B,)
- """
- def pad_list(labels: List[torch.Tensor], padding_value: int = 0):
- """Create padded batch of labels from a list of labels sequences.
- Args:
- labels: Labels sequences. [B x (?)]
- padding_value: Padding value.
- Returns:
- labels: Batch of padded labels sequences. (B,)
- """
- batch_size = len(labels)
- padded = (
- labels[0]
- .new(batch_size, max(x.size(0) for x in labels), *labels[0].size()[1:])
- .fill_(padding_value)
- )
- for i in range(batch_size):
- padded[i, : labels[i].size(0)] = labels[i]
- return padded
- device = labels.device
- labels_unpad = [y[y != ignore_id] for y in labels]
- blank = labels[0].new([blank_id])
- decoder_in = pad_list(
- [torch.cat([blank, label], dim=0) for label in labels_unpad], blank_id
- ).to(device)
- target = pad_list(labels_unpad, blank_id).type(torch.int32).to(device)
- encoder_out_lens = list(map(int, encoder_out_lens))
- t_len = torch.IntTensor(encoder_out_lens).to(device)
- u_len = torch.IntTensor([y.size(0) for y in labels_unpad]).to(device)
- return decoder_in, target, t_len, u_len
- def pad_to_len(t: torch.Tensor, pad_len: int, dim: int):
- """Pad the tensor `t` at `dim` to the length `pad_len` with right padding zeros."""
- if t.size(dim) == pad_len:
- return t
- else:
- pad_size = list(t.shape)
- pad_size[dim] = pad_len - t.size(dim)
- return torch.cat(
- [t, torch.zeros(*pad_size, dtype=t.dtype, device=t.device)], dim=dim
- )
|