|
|
@@ -0,0 +1,420 @@
|
|
|
+# Copyright 3D-Speaker (https://github.com/alibaba-damo-academy/3D-Speaker). All Rights Reserved.
|
|
|
+# Licensed under the Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
|
|
|
+
|
|
|
+""" Res2Net implementation is adapted from https://github.com/wenet-e2e/wespeaker.
|
|
|
+ ERes2Net incorporates both local and global feature fusion techniques to improve the performance.
|
|
|
+ The local feature fusion (LFF) fuses the features within one single residual block to extract the local signal.
|
|
|
+ The global feature fusion (GFF) takes acoustic features of different scales as input to aggregate global signal.
|
|
|
+ ERes2Net-Large is an upgraded version of ERes2Net that uses a larger number of parameters to achieve better
|
|
|
+ recognition performance. Parameters expansion, baseWidth, and scale can be modified to obtain optimal performance.
|
|
|
+"""
|
|
|
+
|
|
|
+import math
|
|
|
+
|
|
|
+import torch
|
|
|
+import torch.nn as nn
|
|
|
+import torch.nn.functional as F
|
|
|
+
|
|
|
+import funasr.models.pooling.pooling_layers as pooling_layers
|
|
|
+from funasr.modules.cnn.fusion import AFF
|
|
|
+
|
|
|
+
|
|
|
+class ReLU(nn.Hardtanh):
|
|
|
+
|
|
|
+ def __init__(self, inplace=False):
|
|
|
+ super(ReLU, self).__init__(0, 20, inplace)
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ inplace_str = 'inplace' if self.inplace else ''
|
|
|
+ return self.__class__.__name__ + ' (' \
|
|
|
+ + inplace_str + ')'
|
|
|
+
|
|
|
+
|
|
|
+def conv1x1(in_planes, out_planes, stride=1):
|
|
|
+ "1x1 convolution without padding"
|
|
|
+ return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride,
|
|
|
+ padding=0, bias=False)
|
|
|
+
|
|
|
+
|
|
|
+def conv3x3(in_planes, out_planes, stride=1):
|
|
|
+ "3x3 convolution with padding"
|
|
|
+ return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
|
|
|
+ padding=1, bias=False)
|
|
|
+
|
|
|
+
|
|
|
+class BasicBlockERes2Net(nn.Module):
|
|
|
+ expansion = 2
|
|
|
+
|
|
|
+ def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
|
|
|
+ super(BasicBlockERes2Net, self).__init__()
|
|
|
+ width = int(math.floor(planes * (baseWidth / 64.0)))
|
|
|
+ self.conv1 = conv1x1(in_planes, width * scale, stride)
|
|
|
+ self.bn1 = nn.BatchNorm2d(width * scale)
|
|
|
+ self.nums = scale
|
|
|
+
|
|
|
+ convs = []
|
|
|
+ bns = []
|
|
|
+ for i in range(self.nums):
|
|
|
+ convs.append(conv3x3(width, width))
|
|
|
+ bns.append(nn.BatchNorm2d(width))
|
|
|
+ self.convs = nn.ModuleList(convs)
|
|
|
+ self.bns = nn.ModuleList(bns)
|
|
|
+ self.relu = ReLU(inplace=True)
|
|
|
+
|
|
|
+ self.conv3 = conv1x1(width * scale, planes * self.expansion)
|
|
|
+ self.bn3 = nn.BatchNorm2d(planes * self.expansion)
|
|
|
+ self.shortcut = nn.Sequential()
|
|
|
+ if stride != 1 or in_planes != self.expansion * planes:
|
|
|
+ self.shortcut = nn.Sequential(
|
|
|
+ nn.Conv2d(in_planes,
|
|
|
+ self.expansion * planes,
|
|
|
+ kernel_size=1,
|
|
|
+ stride=stride,
|
|
|
+ bias=False),
|
|
|
+ nn.BatchNorm2d(self.expansion * planes))
|
|
|
+ self.stride = stride
|
|
|
+ self.width = width
|
|
|
+ self.scale = scale
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ residual = x
|
|
|
+
|
|
|
+ out = self.conv1(x)
|
|
|
+ out = self.bn1(out)
|
|
|
+ out = self.relu(out)
|
|
|
+ spx = torch.split(out, self.width, 1)
|
|
|
+ for i in range(self.nums):
|
|
|
+ if i == 0:
|
|
|
+ sp = spx[i]
|
|
|
+ else:
|
|
|
+ sp = sp + spx[i]
|
|
|
+ sp = self.convs[i](sp)
|
|
|
+ sp = self.relu(self.bns[i](sp))
|
|
|
+ if i == 0:
|
|
|
+ out = sp
|
|
|
+ else:
|
|
|
+ out = torch.cat((out, sp), 1)
|
|
|
+
|
|
|
+ out = self.conv3(out)
|
|
|
+ out = self.bn3(out)
|
|
|
+
|
|
|
+ residual = self.shortcut(x)
|
|
|
+ out += residual
|
|
|
+ out = self.relu(out)
|
|
|
+
|
|
|
+ return out
|
|
|
+
|
|
|
+
|
|
|
+class BasicBlockERes2Net_diff_AFF(nn.Module):
|
|
|
+ expansion = 2
|
|
|
+
|
|
|
+ def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
|
|
|
+ super(BasicBlockERes2Net_diff_AFF, self).__init__()
|
|
|
+ width = int(math.floor(planes * (baseWidth / 64.0)))
|
|
|
+ self.conv1 = conv1x1(in_planes, width * scale, stride)
|
|
|
+ self.bn1 = nn.BatchNorm2d(width * scale)
|
|
|
+ self.nums = scale
|
|
|
+
|
|
|
+ convs = []
|
|
|
+ fuse_models = []
|
|
|
+ bns = []
|
|
|
+ for i in range(self.nums):
|
|
|
+ convs.append(conv3x3(width, width))
|
|
|
+ bns.append(nn.BatchNorm2d(width))
|
|
|
+ for j in range(self.nums - 1):
|
|
|
+ fuse_models.append(AFF(channels=width))
|
|
|
+
|
|
|
+ self.convs = nn.ModuleList(convs)
|
|
|
+ self.bns = nn.ModuleList(bns)
|
|
|
+ self.fuse_models = nn.ModuleList(fuse_models)
|
|
|
+ self.relu = ReLU(inplace=True)
|
|
|
+
|
|
|
+ self.conv3 = conv1x1(width * scale, planes * self.expansion)
|
|
|
+ self.bn3 = nn.BatchNorm2d(planes * self.expansion)
|
|
|
+ self.shortcut = nn.Sequential()
|
|
|
+ if stride != 1 or in_planes != self.expansion * planes:
|
|
|
+ self.shortcut = nn.Sequential(
|
|
|
+ nn.Conv2d(in_planes,
|
|
|
+ self.expansion * planes,
|
|
|
+ kernel_size=1,
|
|
|
+ stride=stride,
|
|
|
+ bias=False),
|
|
|
+ nn.BatchNorm2d(self.expansion * planes))
|
|
|
+ self.stride = stride
|
|
|
+ self.width = width
|
|
|
+ self.scale = scale
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ residual = x
|
|
|
+
|
|
|
+ out = self.conv1(x)
|
|
|
+ out = self.bn1(out)
|
|
|
+ out = self.relu(out)
|
|
|
+ spx = torch.split(out, self.width, 1)
|
|
|
+ for i in range(self.nums):
|
|
|
+ if i == 0:
|
|
|
+ sp = spx[i]
|
|
|
+ else:
|
|
|
+ sp = self.fuse_models[i - 1](sp, spx[i])
|
|
|
+
|
|
|
+ sp = self.convs[i](sp)
|
|
|
+ sp = self.relu(self.bns[i](sp))
|
|
|
+ if i == 0:
|
|
|
+ out = sp
|
|
|
+ else:
|
|
|
+ out = torch.cat((out, sp), 1)
|
|
|
+
|
|
|
+ out = self.conv3(out)
|
|
|
+ out = self.bn3(out)
|
|
|
+
|
|
|
+ residual = self.shortcut(x)
|
|
|
+ out += residual
|
|
|
+ out = self.relu(out)
|
|
|
+
|
|
|
+ return out
|
|
|
+
|
|
|
+
|
|
|
+class ERes2Net(nn.Module):
|
|
|
+ def __init__(self,
|
|
|
+ block=BasicBlockERes2Net,
|
|
|
+ block_fuse=BasicBlockERes2Net_diff_AFF,
|
|
|
+ num_blocks=[3, 4, 6, 3],
|
|
|
+ m_channels=32,
|
|
|
+ feat_dim=80,
|
|
|
+ embedding_size=192,
|
|
|
+ pooling_func='TSTP',
|
|
|
+ two_emb_layer=False):
|
|
|
+ super(ERes2Net, self).__init__()
|
|
|
+ self.in_planes = m_channels
|
|
|
+ self.feat_dim = feat_dim
|
|
|
+ self.embedding_size = embedding_size
|
|
|
+ self.stats_dim = int(feat_dim / 8) * m_channels * 8
|
|
|
+ self.two_emb_layer = two_emb_layer
|
|
|
+
|
|
|
+ self.conv1 = nn.Conv2d(1,
|
|
|
+ m_channels,
|
|
|
+ kernel_size=3,
|
|
|
+ stride=1,
|
|
|
+ padding=1,
|
|
|
+ bias=False)
|
|
|
+ self.bn1 = nn.BatchNorm2d(m_channels)
|
|
|
+ self.layer1 = self._make_layer(block,
|
|
|
+ m_channels,
|
|
|
+ num_blocks[0],
|
|
|
+ stride=1)
|
|
|
+ self.layer2 = self._make_layer(block,
|
|
|
+ m_channels * 2,
|
|
|
+ num_blocks[1],
|
|
|
+ stride=2)
|
|
|
+ self.layer3 = self._make_layer(block_fuse,
|
|
|
+ m_channels * 4,
|
|
|
+ num_blocks[2],
|
|
|
+ stride=2)
|
|
|
+ self.layer4 = self._make_layer(block_fuse,
|
|
|
+ m_channels * 8,
|
|
|
+ num_blocks[3],
|
|
|
+ stride=2)
|
|
|
+
|
|
|
+ # Downsampling module for each layer
|
|
|
+ self.layer1_downsample = nn.Conv2d(m_channels * 2, m_channels * 4, kernel_size=3, stride=2, padding=1,
|
|
|
+ bias=False)
|
|
|
+ self.layer2_downsample = nn.Conv2d(m_channels * 4, m_channels * 8, kernel_size=3, padding=1, stride=2,
|
|
|
+ bias=False)
|
|
|
+ self.layer3_downsample = nn.Conv2d(m_channels * 8, m_channels * 16, kernel_size=3, padding=1, stride=2,
|
|
|
+ bias=False)
|
|
|
+
|
|
|
+ # Bottom-up fusion module
|
|
|
+ self.fuse_mode12 = AFF(channels=m_channels * 4)
|
|
|
+ self.fuse_mode123 = AFF(channels=m_channels * 8)
|
|
|
+ self.fuse_mode1234 = AFF(channels=m_channels * 16)
|
|
|
+
|
|
|
+ self.n_stats = 1 if pooling_func == 'TAP' or pooling_func == "TSDP" else 2
|
|
|
+ self.pool = getattr(pooling_layers, pooling_func)(
|
|
|
+ in_dim=self.stats_dim * block.expansion)
|
|
|
+ self.seg_1 = nn.Linear(self.stats_dim * block.expansion * self.n_stats,
|
|
|
+ embedding_size)
|
|
|
+ if self.two_emb_layer:
|
|
|
+ self.seg_bn_1 = nn.BatchNorm1d(embedding_size, affine=False)
|
|
|
+ self.seg_2 = nn.Linear(embedding_size, embedding_size)
|
|
|
+ else:
|
|
|
+ self.seg_bn_1 = nn.Identity()
|
|
|
+ self.seg_2 = nn.Identity()
|
|
|
+
|
|
|
+ def _make_layer(self, block, planes, num_blocks, stride):
|
|
|
+ strides = [stride] + [1] * (num_blocks - 1)
|
|
|
+ layers = []
|
|
|
+ for stride in strides:
|
|
|
+ layers.append(block(self.in_planes, planes, stride))
|
|
|
+ self.in_planes = planes * block.expansion
|
|
|
+ return nn.Sequential(*layers)
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ x = x.permute(0, 2, 1) # (B,T,F) => (B,F,T)
|
|
|
+ x = x.unsqueeze_(1)
|
|
|
+ out = F.relu(self.bn1(self.conv1(x)))
|
|
|
+ out1 = self.layer1(out)
|
|
|
+ out2 = self.layer2(out1)
|
|
|
+ out1_downsample = self.layer1_downsample(out1)
|
|
|
+ fuse_out12 = self.fuse_mode12(out2, out1_downsample)
|
|
|
+ out3 = self.layer3(out2)
|
|
|
+ fuse_out12_downsample = self.layer2_downsample(fuse_out12)
|
|
|
+ fuse_out123 = self.fuse_mode123(out3, fuse_out12_downsample)
|
|
|
+ out4 = self.layer4(out3)
|
|
|
+ fuse_out123_downsample = self.layer3_downsample(fuse_out123)
|
|
|
+ fuse_out1234 = self.fuse_mode1234(out4, fuse_out123_downsample)
|
|
|
+ stats = self.pool(fuse_out1234)
|
|
|
+
|
|
|
+ embed_a = self.seg_1(stats)
|
|
|
+ if self.two_emb_layer:
|
|
|
+ out = F.relu(embed_a)
|
|
|
+ out = self.seg_bn_1(out)
|
|
|
+ embed_b = self.seg_2(out)
|
|
|
+ return embed_b
|
|
|
+ else:
|
|
|
+ return embed_a
|
|
|
+
|
|
|
+
|
|
|
+class BasicBlockRes2Net(nn.Module):
|
|
|
+ expansion = 2
|
|
|
+
|
|
|
+ def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
|
|
|
+ super(BasicBlockRes2Net, self).__init__()
|
|
|
+ width = int(math.floor(planes * (baseWidth / 64.0)))
|
|
|
+ self.conv1 = conv1x1(in_planes, width * scale, stride)
|
|
|
+ self.bn1 = nn.BatchNorm2d(width * scale)
|
|
|
+ self.nums = scale - 1
|
|
|
+ convs = []
|
|
|
+ bns = []
|
|
|
+ for i in range(self.nums):
|
|
|
+ convs.append(conv3x3(width, width))
|
|
|
+ bns.append(nn.BatchNorm2d(width))
|
|
|
+ self.convs = nn.ModuleList(convs)
|
|
|
+ self.bns = nn.ModuleList(bns)
|
|
|
+ self.relu = ReLU(inplace=True)
|
|
|
+
|
|
|
+ self.conv3 = conv1x1(width * scale, planes * self.expansion)
|
|
|
+ self.bn3 = nn.BatchNorm2d(planes * self.expansion)
|
|
|
+ self.shortcut = nn.Sequential()
|
|
|
+ if stride != 1 or in_planes != self.expansion * planes:
|
|
|
+ self.shortcut = nn.Sequential(
|
|
|
+ nn.Conv2d(in_planes,
|
|
|
+ self.expansion * planes,
|
|
|
+ kernel_size=1,
|
|
|
+ stride=stride,
|
|
|
+ bias=False),
|
|
|
+ nn.BatchNorm2d(self.expansion * planes))
|
|
|
+ self.stride = stride
|
|
|
+ self.width = width
|
|
|
+ self.scale = scale
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ residual = x
|
|
|
+
|
|
|
+ out = self.conv1(x)
|
|
|
+ out = self.bn1(out)
|
|
|
+ out = self.relu(out)
|
|
|
+ spx = torch.split(out, self.width, 1)
|
|
|
+ for i in range(self.nums):
|
|
|
+ if i == 0:
|
|
|
+ sp = spx[i]
|
|
|
+ else:
|
|
|
+ sp = sp + spx[i]
|
|
|
+ sp = self.convs[i](sp)
|
|
|
+ sp = self.relu(self.bns[i](sp))
|
|
|
+ if i == 0:
|
|
|
+ out = sp
|
|
|
+ else:
|
|
|
+ out = torch.cat((out, sp), 1)
|
|
|
+
|
|
|
+ out = torch.cat((out, spx[self.nums]), 1)
|
|
|
+
|
|
|
+ out = self.conv3(out)
|
|
|
+ out = self.bn3(out)
|
|
|
+
|
|
|
+ residual = self.shortcut(x)
|
|
|
+ out += residual
|
|
|
+ out = self.relu(out)
|
|
|
+
|
|
|
+ return out
|
|
|
+
|
|
|
+
|
|
|
+class Res2Net(nn.Module):
|
|
|
+ def __init__(self,
|
|
|
+ block=BasicBlockRes2Net,
|
|
|
+ num_blocks=[3, 4, 6, 3],
|
|
|
+ m_channels=32,
|
|
|
+ feat_dim=80,
|
|
|
+ embedding_size=192,
|
|
|
+ pooling_func='TSTP',
|
|
|
+ two_emb_layer=False):
|
|
|
+ super(Res2Net, self).__init__()
|
|
|
+ self.in_planes = m_channels
|
|
|
+ self.feat_dim = feat_dim
|
|
|
+ self.embedding_size = embedding_size
|
|
|
+ self.stats_dim = int(feat_dim / 8) * m_channels * 8
|
|
|
+ self.two_emb_layer = two_emb_layer
|
|
|
+
|
|
|
+ self.conv1 = nn.Conv2d(1,
|
|
|
+ m_channels,
|
|
|
+ kernel_size=3,
|
|
|
+ stride=1,
|
|
|
+ padding=1,
|
|
|
+ bias=False)
|
|
|
+ self.bn1 = nn.BatchNorm2d(m_channels)
|
|
|
+ self.layer1 = self._make_layer(block,
|
|
|
+ m_channels,
|
|
|
+ num_blocks[0],
|
|
|
+ stride=1)
|
|
|
+ self.layer2 = self._make_layer(block,
|
|
|
+ m_channels * 2,
|
|
|
+ num_blocks[1],
|
|
|
+ stride=2)
|
|
|
+ self.layer3 = self._make_layer(block,
|
|
|
+ m_channels * 4,
|
|
|
+ num_blocks[2],
|
|
|
+ stride=2)
|
|
|
+ self.layer4 = self._make_layer(block,
|
|
|
+ m_channels * 8,
|
|
|
+ num_blocks[3],
|
|
|
+ stride=2)
|
|
|
+
|
|
|
+ self.n_stats = 1 if pooling_func == 'TAP' or pooling_func == "TSDP" else 2
|
|
|
+ self.pool = getattr(pooling_layers, pooling_func)(
|
|
|
+ in_dim=self.stats_dim * block.expansion)
|
|
|
+ self.seg_1 = nn.Linear(self.stats_dim * block.expansion * self.n_stats,
|
|
|
+ embedding_size)
|
|
|
+ if self.two_emb_layer:
|
|
|
+ self.seg_bn_1 = nn.BatchNorm1d(embedding_size, affine=False)
|
|
|
+ self.seg_2 = nn.Linear(embedding_size, embedding_size)
|
|
|
+ else:
|
|
|
+ self.seg_bn_1 = nn.Identity()
|
|
|
+ self.seg_2 = nn.Identity()
|
|
|
+
|
|
|
+ def _make_layer(self, block, planes, num_blocks, stride):
|
|
|
+ strides = [stride] + [1] * (num_blocks - 1)
|
|
|
+ layers = []
|
|
|
+ for stride in strides:
|
|
|
+ layers.append(block(self.in_planes, planes, stride))
|
|
|
+ self.in_planes = planes * block.expansion
|
|
|
+ return nn.Sequential(*layers)
|
|
|
+
|
|
|
+ def forward(self, x):
|
|
|
+ x = x.permute(0, 2, 1) # (B,T,F) => (B,F,T)
|
|
|
+
|
|
|
+ x = x.unsqueeze_(1)
|
|
|
+ out = F.relu(self.bn1(self.conv1(x)))
|
|
|
+ out = self.layer1(out)
|
|
|
+ out = self.layer2(out)
|
|
|
+ out = self.layer3(out)
|
|
|
+ out = self.layer4(out)
|
|
|
+
|
|
|
+ stats = self.pool(out)
|
|
|
+
|
|
|
+ embed_a = self.seg_1(stats)
|
|
|
+ if self.two_emb_layer:
|
|
|
+ out = F.relu(embed_a)
|
|
|
+ out = self.seg_bn_1(out)
|
|
|
+ embed_b = self.seg_2(out)
|
|
|
+ return embed_b
|
|
|
+ else:
|
|
|
+ return embed_a
|