| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- # Copyright 3D-Speaker (https://github.com/alibaba-damo-academy/3D-Speaker). All Rights Reserved.
- # Licensed under the Apache License, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
- """ Res2Net implementation is adapted from https://github.com/wenet-e2e/wespeaker.
- ERes2Net incorporates both local and global feature fusion techniques to improve the performance.
- The local feature fusion (LFF) fuses the features within one single residual block to extract the local signal.
- The global feature fusion (GFF) takes acoustic features of different scales as input to aggregate global signal.
- ERes2Net-Large is an upgraded version of ERes2Net that uses a larger number of parameters to achieve better
- recognition performance. Parameters expansion, baseWidth, and scale can be modified to obtain optimal performance.
- """
- import torch
- import math
- import torch.nn as nn
- import torch.nn.functional as F
- import funasr.models.whisper_lid.eres2net.pooling_layers as pooling_layers
- from funasr.models.whisper_lid.eres2net.fusion import AFF
- class ReLU(nn.Hardtanh):
- def __init__(self, inplace=False):
- super(ReLU, self).__init__(0, 20, inplace)
- def __repr__(self):
- inplace_str = 'inplace' if self.inplace else ''
- return self.__class__.__name__ + ' (' \
- + inplace_str + ')'
- def conv1x1(in_planes, out_planes, stride=1):
- "1x1 convolution without padding"
- return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride,
- padding=0, bias=False)
- def conv3x3(in_planes, out_planes, stride=1):
- "3x3 convolution with padding"
- return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride,
- padding=1, bias=False)
- class BasicBlockERes2Net(nn.Module):
- expansion = 2
- def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
- super(BasicBlockERes2Net, self).__init__()
- width = int(math.floor(planes * (baseWidth / 64.0)))
- self.conv1 = conv1x1(in_planes, width * scale, stride)
- self.bn1 = nn.BatchNorm2d(width * scale)
- self.nums = scale
- convs = []
- bns = []
- for i in range(self.nums):
- convs.append(conv3x3(width, width))
- bns.append(nn.BatchNorm2d(width))
- self.convs = nn.ModuleList(convs)
- self.bns = nn.ModuleList(bns)
- self.relu = ReLU(inplace=True)
- self.conv3 = conv1x1(width * scale, planes * self.expansion)
- self.bn3 = nn.BatchNorm2d(planes * self.expansion)
- self.shortcut = nn.Sequential()
- if stride != 1 or in_planes != self.expansion * planes:
- self.shortcut = nn.Sequential(
- nn.Conv2d(in_planes,
- self.expansion * planes,
- kernel_size=1,
- stride=stride,
- bias=False),
- nn.BatchNorm2d(self.expansion * planes))
- self.stride = stride
- self.width = width
- self.scale = scale
- def forward(self, x):
- residual = x
- out = self.conv1(x)
- out = self.bn1(out)
- out = self.relu(out)
- spx = torch.split(out, self.width, 1)
- for i in range(self.nums):
- if i == 0:
- sp = spx[i]
- else:
- sp = sp + spx[i]
- sp = self.convs[i](sp)
- sp = self.relu(self.bns[i](sp))
- if i == 0:
- out = sp
- else:
- out = torch.cat((out, sp), 1)
- out = self.conv3(out)
- out = self.bn3(out)
- residual = self.shortcut(x)
- out += residual
- out = self.relu(out)
- return out
- class BasicBlockERes2Net_diff_AFF(nn.Module):
- expansion = 2
- def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
- super(BasicBlockERes2Net_diff_AFF, self).__init__()
- width = int(math.floor(planes * (baseWidth / 64.0)))
- self.conv1 = conv1x1(in_planes, width * scale, stride)
- self.bn1 = nn.BatchNorm2d(width * scale)
- self.nums = scale
- convs = []
- fuse_models = []
- bns = []
- for i in range(self.nums):
- convs.append(conv3x3(width, width))
- bns.append(nn.BatchNorm2d(width))
- for j in range(self.nums - 1):
- fuse_models.append(AFF(channels=width))
- self.convs = nn.ModuleList(convs)
- self.bns = nn.ModuleList(bns)
- self.fuse_models = nn.ModuleList(fuse_models)
- self.relu = ReLU(inplace=True)
- self.conv3 = conv1x1(width * scale, planes * self.expansion)
- self.bn3 = nn.BatchNorm2d(planes * self.expansion)
- self.shortcut = nn.Sequential()
- if stride != 1 or in_planes != self.expansion * planes:
- self.shortcut = nn.Sequential(
- nn.Conv2d(in_planes,
- self.expansion * planes,
- kernel_size=1,
- stride=stride,
- bias=False),
- nn.BatchNorm2d(self.expansion * planes))
- self.stride = stride
- self.width = width
- self.scale = scale
- def forward(self, x):
- residual = x
- out = self.conv1(x)
- out = self.bn1(out)
- out = self.relu(out)
- spx = torch.split(out, self.width, 1)
- for i in range(self.nums):
- if i == 0:
- sp = spx[i]
- else:
- sp = self.fuse_models[i - 1](sp, spx[i])
- sp = self.convs[i](sp)
- sp = self.relu(self.bns[i](sp))
- if i == 0:
- out = sp
- else:
- out = torch.cat((out, sp), 1)
- out = self.conv3(out)
- out = self.bn3(out)
- residual = self.shortcut(x)
- out += residual
- out = self.relu(out)
- return out
- class ERes2Net(nn.Module):
- def __init__(self,
- block=BasicBlockERes2Net,
- block_fuse=BasicBlockERes2Net_diff_AFF,
- num_blocks=[3, 4, 6, 3],
- m_channels=32,
- feat_dim=80,
- embedding_size=192,
- pooling_func='TSTP',
- two_emb_layer=False):
- super(ERes2Net, self).__init__()
- self.in_planes = m_channels
- self.feat_dim = feat_dim
- self.embedding_size = embedding_size
- self.stats_dim = int(feat_dim / 8) * m_channels * 8
- self.two_emb_layer = two_emb_layer
- self._output_size = embedding_size
- self.conv1 = nn.Conv2d(1,
- m_channels,
- kernel_size=3,
- stride=1,
- padding=1,
- bias=False)
- self.bn1 = nn.BatchNorm2d(m_channels)
- self.layer1 = self._make_layer(block,
- m_channels,
- num_blocks[0],
- stride=1)
- self.layer2 = self._make_layer(block,
- m_channels * 2,
- num_blocks[1],
- stride=2)
- self.layer3 = self._make_layer(block_fuse,
- m_channels * 4,
- num_blocks[2],
- stride=2)
- self.layer4 = self._make_layer(block_fuse,
- m_channels * 8,
- num_blocks[3],
- stride=2)
- # Downsampling module for each layer
- self.layer1_downsample = nn.Conv2d(m_channels * 2, m_channels * 4, kernel_size=3, stride=2, padding=1,
- bias=False)
- self.layer2_downsample = nn.Conv2d(m_channels * 4, m_channels * 8, kernel_size=3, padding=1, stride=2,
- bias=False)
- self.layer3_downsample = nn.Conv2d(m_channels * 8, m_channels * 16, kernel_size=3, padding=1, stride=2,
- bias=False)
- # Bottom-up fusion module
- self.fuse_mode12 = AFF(channels=m_channels * 4)
- self.fuse_mode123 = AFF(channels=m_channels * 8)
- self.fuse_mode1234 = AFF(channels=m_channels * 16)
- self.n_stats = 1 if pooling_func == 'TAP' or pooling_func == "TSDP" else 2
- self.pool = getattr(pooling_layers, pooling_func)(
- in_dim=self.stats_dim * block.expansion)
- self.seg_1 = nn.Linear(self.stats_dim * block.expansion * self.n_stats,
- embedding_size)
- if self.two_emb_layer:
- self.seg_bn_1 = nn.BatchNorm1d(embedding_size, affine=False)
- self.seg_2 = nn.Linear(embedding_size, embedding_size)
- else:
- self.seg_bn_1 = nn.Identity()
- self.seg_2 = nn.Identity()
- def _make_layer(self, block, planes, num_blocks, stride):
- strides = [stride] + [1] * (num_blocks - 1)
- layers = []
- for stride in strides:
- layers.append(block(self.in_planes, planes, stride))
- self.in_planes = planes * block.expansion
- return nn.Sequential(*layers)
- def output_size(self) -> int:
- return self._output_size
- def forward(self, x, ilens):
- # assert x.shape[1] == ilens.max()
- x = x.permute(0, 2, 1) # (B,T,F) => (B,F,T)
- x = x.unsqueeze_(1)
- out = F.relu(self.bn1(self.conv1(x)))
- out1 = self.layer1(out)
- out2 = self.layer2(out1)
- out1_downsample = self.layer1_downsample(out1)
- fuse_out12 = self.fuse_mode12(out2, out1_downsample)
- out3 = self.layer3(out2)
- fuse_out12_downsample = self.layer2_downsample(fuse_out12)
- fuse_out123 = self.fuse_mode123(out3, fuse_out12_downsample)
- out4 = self.layer4(out3)
- fuse_out123_downsample = self.layer3_downsample(fuse_out123)
- fuse_out1234 = self.fuse_mode1234(out4, fuse_out123_downsample)
- olens = (((((ilens - 1) // 2 + 1) - 1) // 2 + 1) - 1) // 2 + 1
- stats = self.pool(fuse_out1234, olens)
- embed_a = self.seg_1(stats)
- if self.two_emb_layer:
- out = F.relu(embed_a)
- out = self.seg_bn_1(out)
- embed_b = self.seg_2(out)
- return embed_b
- else:
- return embed_a
- class BasicBlockRes2Net(nn.Module):
- expansion = 2
- def __init__(self, in_planes, planes, stride=1, baseWidth=32, scale=2):
- super(BasicBlockRes2Net, self).__init__()
- width = int(math.floor(planes * (baseWidth / 64.0)))
- self.conv1 = conv1x1(in_planes, width * scale, stride)
- self.bn1 = nn.BatchNorm2d(width * scale)
- self.nums = scale - 1
- convs = []
- bns = []
- for i in range(self.nums):
- convs.append(conv3x3(width, width))
- bns.append(nn.BatchNorm2d(width))
- self.convs = nn.ModuleList(convs)
- self.bns = nn.ModuleList(bns)
- self.relu = ReLU(inplace=True)
- self.conv3 = conv1x1(width * scale, planes * self.expansion)
- self.bn3 = nn.BatchNorm2d(planes * self.expansion)
- self.shortcut = nn.Sequential()
- if stride != 1 or in_planes != self.expansion * planes:
- self.shortcut = nn.Sequential(
- nn.Conv2d(in_planes,
- self.expansion * planes,
- kernel_size=1,
- stride=stride,
- bias=False),
- nn.BatchNorm2d(self.expansion * planes))
- self.stride = stride
- self.width = width
- self.scale = scale
- def forward(self, x):
- residual = x
- out = self.conv1(x)
- out = self.bn1(out)
- out = self.relu(out)
- spx = torch.split(out, self.width, 1)
- for i in range(self.nums):
- if i == 0:
- sp = spx[i]
- else:
- sp = sp + spx[i]
- sp = self.convs[i](sp)
- sp = self.relu(self.bns[i](sp))
- if i == 0:
- out = sp
- else:
- out = torch.cat((out, sp), 1)
- out = torch.cat((out, spx[self.nums]), 1)
- out = self.conv3(out)
- out = self.bn3(out)
- residual = self.shortcut(x)
- out += residual
- out = self.relu(out)
- return out
- class Res2Net(nn.Module):
- def __init__(self,
- block=BasicBlockRes2Net,
- num_blocks=[3, 4, 6, 3],
- m_channels=32,
- feat_dim=80,
- embedding_size=192,
- pooling_func='TSTP',
- two_emb_layer=False):
- super(Res2Net, self).__init__()
- self.in_planes = m_channels
- self.feat_dim = feat_dim
- self.embedding_size = embedding_size
- self.stats_dim = int(feat_dim / 8) * m_channels * 8
- self.two_emb_layer = two_emb_layer
- self.conv1 = nn.Conv2d(1,
- m_channels,
- kernel_size=3,
- stride=1,
- padding=1,
- bias=False)
- self.bn1 = nn.BatchNorm2d(m_channels)
- self.layer1 = self._make_layer(block,
- m_channels,
- num_blocks[0],
- stride=1)
- self.layer2 = self._make_layer(block,
- m_channels * 2,
- num_blocks[1],
- stride=2)
- self.layer3 = self._make_layer(block,
- m_channels * 4,
- num_blocks[2],
- stride=2)
- self.layer4 = self._make_layer(block,
- m_channels * 8,
- num_blocks[3],
- stride=2)
- self.n_stats = 1 if pooling_func == 'TAP' or pooling_func == "TSDP" else 2
- self.pool = getattr(pooling_layers, pooling_func)(
- in_dim=self.stats_dim * block.expansion)
- self.seg_1 = nn.Linear(self.stats_dim * block.expansion * self.n_stats,
- embedding_size)
- if self.two_emb_layer:
- self.seg_bn_1 = nn.BatchNorm1d(embedding_size, affine=False)
- self.seg_2 = nn.Linear(embedding_size, embedding_size)
- else:
- self.seg_bn_1 = nn.Identity()
- self.seg_2 = nn.Identity()
- def _make_layer(self, block, planes, num_blocks, stride):
- strides = [stride] + [1] * (num_blocks - 1)
- layers = []
- for stride in strides:
- layers.append(block(self.in_planes, planes, stride))
- self.in_planes = planes * block.expansion
- return nn.Sequential(*layers)
- def forward(self, x):
- x = x.permute(0, 2, 1) # (B,T,F) => (B,F,T)
- x = x.unsqueeze_(1)
- out = F.relu(self.bn1(self.conv1(x)))
- out = self.layer1(out)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- stats = self.pool(out)
- embed_a = self.seg_1(stats)
- if self.two_emb_layer:
- out = F.relu(embed_a)
- out = self.seg_bn_1(out)
- embed_b = self.seg_2(out)
- return embed_b
- else:
- return embed_a
|