dynamic_conv.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """Dynamic Convolution module."""
  2. import numpy
  3. import torch
  4. from torch import nn
  5. import torch.nn.functional as F
  6. MIN_VALUE = float(numpy.finfo(numpy.float32).min)
  7. class DynamicConvolution(nn.Module):
  8. """Dynamic Convolution layer.
  9. This implementation is based on
  10. https://github.com/pytorch/fairseq/tree/master/fairseq
  11. Args:
  12. wshare (int): the number of kernel of convolution
  13. n_feat (int): the number of features
  14. dropout_rate (float): dropout_rate
  15. kernel_size (int): kernel size (length)
  16. use_kernel_mask (bool): Use causal mask or not for convolution kernel
  17. use_bias (bool): Use bias term or not.
  18. """
  19. def __init__(
  20. self,
  21. wshare,
  22. n_feat,
  23. dropout_rate,
  24. kernel_size,
  25. use_kernel_mask=False,
  26. use_bias=False,
  27. ):
  28. """Construct Dynamic Convolution layer."""
  29. super(DynamicConvolution, self).__init__()
  30. assert n_feat % wshare == 0
  31. self.wshare = wshare
  32. self.use_kernel_mask = use_kernel_mask
  33. self.dropout_rate = dropout_rate
  34. self.kernel_size = kernel_size
  35. self.attn = None
  36. # linear -> GLU -- -> lightconv -> linear
  37. # \ /
  38. # Linear
  39. self.linear1 = nn.Linear(n_feat, n_feat * 2)
  40. self.linear2 = nn.Linear(n_feat, n_feat)
  41. self.linear_weight = nn.Linear(n_feat, self.wshare * 1 * kernel_size)
  42. nn.init.xavier_uniform(self.linear_weight.weight)
  43. self.act = nn.GLU()
  44. # dynamic conv related
  45. self.use_bias = use_bias
  46. if self.use_bias:
  47. self.bias = nn.Parameter(torch.Tensor(n_feat))
  48. def forward(self, query, key, value, mask):
  49. """Forward of 'Dynamic Convolution'.
  50. This function takes query, key and value but uses only quert.
  51. This is just for compatibility with self-attention layer (attention.py)
  52. Args:
  53. query (torch.Tensor): (batch, time1, d_model) input tensor
  54. key (torch.Tensor): (batch, time2, d_model) NOT USED
  55. value (torch.Tensor): (batch, time2, d_model) NOT USED
  56. mask (torch.Tensor): (batch, time1, time2) mask
  57. Return:
  58. x (torch.Tensor): (batch, time1, d_model) output
  59. """
  60. # linear -> GLU -- -> lightconv -> linear
  61. # \ /
  62. # Linear
  63. x = query
  64. B, T, C = x.size()
  65. H = self.wshare
  66. k = self.kernel_size
  67. # first liner layer
  68. x = self.linear1(x)
  69. # GLU activation
  70. x = self.act(x)
  71. # get kernel of convolution
  72. weight = self.linear_weight(x) # B x T x kH
  73. weight = F.dropout(weight, self.dropout_rate, training=self.training)
  74. weight = weight.view(B, T, H, k).transpose(1, 2).contiguous() # B x H x T x k
  75. weight_new = torch.zeros(B * H * T * (T + k - 1), dtype=weight.dtype)
  76. weight_new = weight_new.view(B, H, T, T + k - 1).fill_(float("-inf"))
  77. weight_new = weight_new.to(x.device) # B x H x T x T+k-1
  78. weight_new.as_strided(
  79. (B, H, T, k), ((T + k - 1) * T * H, (T + k - 1) * T, T + k, 1)
  80. ).copy_(weight)
  81. weight_new = weight_new.narrow(-1, int((k - 1) / 2), T) # B x H x T x T(k)
  82. if self.use_kernel_mask:
  83. kernel_mask = torch.tril(torch.ones(T, T, device=x.device)).unsqueeze(0)
  84. weight_new = weight_new.masked_fill(kernel_mask == 0.0, float("-inf"))
  85. weight_new = F.softmax(weight_new, dim=-1)
  86. self.attn = weight_new
  87. weight_new = weight_new.view(B * H, T, T)
  88. # convolution
  89. x = x.transpose(1, 2).contiguous() # B x C x T
  90. x = x.view(B * H, int(C / H), T).transpose(1, 2)
  91. x = torch.bmm(weight_new, x) # BH x T x C/H
  92. x = x.transpose(1, 2).contiguous().view(B, C, T)
  93. if self.use_bias:
  94. x = x + self.bias.view(1, -1, 1)
  95. x = x.transpose(1, 2) # B x T x C
  96. if mask is not None and not self.use_kernel_mask:
  97. mask = mask.transpose(-1, -2)
  98. x = x.masked_fill(mask == 0, 0.0)
  99. # second linear layer
  100. x = self.linear2(x)
  101. return x