| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import logging
- import torch
- import torch.nn.functional as F
- from typeguard import check_argument_types
- class CTC(torch.nn.Module):
- """CTC module.
- Args:
- odim: dimension of outputs
- encoder_output_size: number of encoder projection units
- dropout_rate: dropout rate (0.0 ~ 1.0)
- ctc_type: builtin or warpctc
- reduce: reduce the CTC loss into a scalar
- """
- def __init__(
- self,
- odim: int,
- encoder_output_size: int,
- dropout_rate: float = 0.0,
- ctc_type: str = "builtin",
- reduce: bool = True,
- ignore_nan_grad: bool = True,
- ):
- assert check_argument_types()
- super().__init__()
- eprojs = encoder_output_size
- self.dropout_rate = dropout_rate
- self.ctc_lo = torch.nn.Linear(eprojs, odim)
- self.ctc_type = ctc_type
- self.ignore_nan_grad = ignore_nan_grad
- if self.ctc_type == "builtin":
- self.ctc_loss = torch.nn.CTCLoss(reduction="none")
- elif self.ctc_type == "warpctc":
- import warpctc_pytorch as warp_ctc
- if ignore_nan_grad:
- logging.warning("ignore_nan_grad option is not supported for warp_ctc")
- self.ctc_loss = warp_ctc.CTCLoss(size_average=True, reduce=reduce)
- elif self.ctc_type == "gtnctc":
- from espnet.nets.pytorch_backend.gtn_ctc import GTNCTCLossFunction
- self.ctc_loss = GTNCTCLossFunction.apply
- else:
- raise ValueError(
- f'ctc_type must be "builtin" or "warpctc": {self.ctc_type}'
- )
- self.reduce = reduce
- def loss_fn(self, th_pred, th_target, th_ilen, th_olen) -> torch.Tensor:
- if self.ctc_type == "builtin":
- th_pred = th_pred.log_softmax(2)
- loss = self.ctc_loss(th_pred, th_target, th_ilen, th_olen)
- if loss.requires_grad and self.ignore_nan_grad:
- # ctc_grad: (L, B, O)
- ctc_grad = loss.grad_fn(torch.ones_like(loss))
- ctc_grad = ctc_grad.sum([0, 2])
- indices = torch.isfinite(ctc_grad)
- size = indices.long().sum()
- if size == 0:
- # Return as is
- logging.warning(
- "All samples in this mini-batch got nan grad."
- " Returning nan value instead of CTC loss"
- )
- elif size != th_pred.size(1):
- logging.warning(
- f"{th_pred.size(1) - size}/{th_pred.size(1)}"
- " samples got nan grad."
- " These were ignored for CTC loss."
- )
- # Create mask for target
- target_mask = torch.full(
- [th_target.size(0)],
- 1,
- dtype=torch.bool,
- device=th_target.device,
- )
- s = 0
- for ind, le in enumerate(th_olen):
- if not indices[ind]:
- target_mask[s : s + le] = 0
- s += le
- # Calc loss again using maksed data
- loss = self.ctc_loss(
- th_pred[:, indices, :],
- th_target[target_mask],
- th_ilen[indices],
- th_olen[indices],
- )
- else:
- size = th_pred.size(1)
- if self.reduce:
- # Batch-size average
- loss = loss.sum() / size
- else:
- loss = loss / size
- return loss
- elif self.ctc_type == "warpctc":
- # warpctc only supports float32
- th_pred = th_pred.to(dtype=torch.float32)
- th_target = th_target.cpu().int()
- th_ilen = th_ilen.cpu().int()
- th_olen = th_olen.cpu().int()
- loss = self.ctc_loss(th_pred, th_target, th_ilen, th_olen)
- if self.reduce:
- # NOTE: sum() is needed to keep consistency since warpctc
- # return as tensor w/ shape (1,)
- # but builtin return as tensor w/o shape (scalar).
- loss = loss.sum()
- return loss
- elif self.ctc_type == "gtnctc":
- log_probs = torch.nn.functional.log_softmax(th_pred, dim=2)
- return self.ctc_loss(log_probs, th_target, th_ilen, 0, "none")
- else:
- raise NotImplementedError
- def forward(self, hs_pad, hlens, ys_pad, ys_lens):
- """Calculate CTC loss.
- Args:
- hs_pad: batch of padded hidden state sequences (B, Tmax, D)
- hlens: batch of lengths of hidden state sequences (B)
- ys_pad: batch of padded character id sequence tensor (B, Lmax)
- ys_lens: batch of lengths of character sequence (B)
- """
- # hs_pad: (B, L, NProj) -> ys_hat: (B, L, Nvocab)
- ys_hat = self.ctc_lo(F.dropout(hs_pad, p=self.dropout_rate))
- if self.ctc_type == "gtnctc":
- # gtn expects list form for ys
- ys_true = [y[y != -1] for y in ys_pad] # parse padded ys
- else:
- # ys_hat: (B, L, D) -> (L, B, D)
- ys_hat = ys_hat.transpose(0, 1)
- # (B, L) -> (BxL,)
- ys_true = torch.cat([ys_pad[i, :l] for i, l in enumerate(ys_lens)])
- loss = self.loss_fn(ys_hat, ys_true, hlens, ys_lens).to(
- device=hs_pad.device, dtype=hs_pad.dtype
- )
- return loss
- def softmax(self, hs_pad):
- """softmax of frame activations
- Args:
- Tensor hs_pad: 3d tensor (B, Tmax, eprojs)
- Returns:
- torch.Tensor: softmax applied 3d tensor (B, Tmax, odim)
- """
- return F.softmax(self.ctc_lo(hs_pad), dim=2)
- def log_softmax(self, hs_pad):
- """log_softmax of frame activations
- Args:
- Tensor hs_pad: 3d tensor (B, Tmax, eprojs)
- Returns:
- torch.Tensor: log softmax applied 3d tensor (B, Tmax, odim)
- """
- return F.log_softmax(self.ctc_lo(hs_pad), dim=2)
- def argmax(self, hs_pad):
- """argmax of frame activations
- Args:
- torch.Tensor hs_pad: 3d tensor (B, Tmax, eprojs)
- Returns:
- torch.Tensor: argmax applied 2d tensor (B, Tmax)
- """
- return torch.argmax(self.ctc_lo(hs_pad), dim=2)
|