Passed
Branch master (bddbc4)
by Jeremy
01:43
created

vit_model.VisionTransformer.__init__()   C

Complexity

Conditions 9

Size

Total Lines 56
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 41
nop 19
dl 0
loc 56
rs 6.5626
c 0
b 0
f 0

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
from collections import OrderedDict
2
from functools import partial
3
4
import torch
5
import torch.nn as nn
6
from timm.layers import DropPath
7
8
9
class PatchEmbed(nn.Module):
10
    def __init__(self, img_size=224, patch_size=16, in_c=3, embed_dim=768, norm_layer=None):
11
        # img_size图像大小   patch_size每个图像块patch的大小  in_c 输入通道  embed_dim 嵌入维度  norm_layer 可选的归一化层
12
        super().__init__()
13
        img_size = (img_size, img_size)   # 将输入图像大小变为二维元组
14
        patch_size = (patch_size, patch_size)
15
        self.img_size = img_size
16
        self.patch_size = patch_size
17
        self.grid_size = (img_size[0] // patch_size[0], img_size[1] // patch_size[1])  # 224/16, 224/16  以patch为单位形成的新“图像”尺寸
18
        self.num_patches = self.grid_size[0] * self.grid_size[1]  # 14*14=196 patch总数
19
20
        self.proj = nn.Conv2d(in_c, embed_dim, kernel_size=patch_size, stride=patch_size) # 利用一个卷积核为16*16,步长为16大小进行卷积操作来等效实现将原图拆分成patch   B, 3, 224, 224 -> B, 768, 14, 14
21
        self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity()  # 若存在norm layer则使用,否则保持不变
22
23
    def forward(self, x):
24
        B, C, H, W = x.shape   # 获取输入张量的形状
25
        assert H == self.img_size[0] and W == self.img_size[1],\
26
        f"输入图像大小{H} * {W}与模型期望大小{self.img_size[0]}*{self.img_size[1]}不匹配"
27
        # B, 3, 224, 224 -> B, 768, 14, 14 -> B, 768, 196 -> B, 196, 768
28
        x = self.proj(x).flatten(2).transpose(1, 2)
29
        x = self.norm(x) # 使用norm层进行归一化
30
        return x
31
32
33
34
35
class Attention(nn.Module):
36
    # dim输入的token维度768, num_heads注意力头数,qkv_bias生成QKV的时候是否添加偏置,
37
    # qk_scale用于缩放QK的缩放因子,若为None,则使用1/sqrt(embed_dim_pre_head)
38
    # atte_drop_ration注意力分数的dropout的比率,防止过拟合  proj_drop_ration最终投影层的dropout的比率
39
    def __init__(self, dim, num_heads=8, qkv_bias=False, qk_scale=None, atte_drop_ration=0., proj_drop_ration=0.):
40
        super().__init__()
41
        self.num_heads = num_heads
42
        head_dim = dim // num_heads # 每个注意力头的维度
43
        self.scale = qk_scale or head_dim ** -0.5  # qk的缩放因子
44
        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) # 通过全连接层生成QKV,为了并行运算提高计算效率,同时参数更少
45
        self.attn_drop = nn.Dropout(atte_drop_ration)
46
        self.proj_drop = nn.Dropout(proj_drop_ration)
47
        # 将每个head得到的输出进行concat拼接,然后通过线性变换映射回原本的嵌入dim
48
        self.proj = nn.Linear(dim, dim, bias=qkv_bias)
49
50
    def forward(self, x):
51
        B, N, C = x.shape  # B为batch,N为num_patch+1,C为embed_dim  +1为clstoken
52
        #  B N 3*C -> B N 3 num_heads, C//self.num_heads -> 3 B num_heads N C//self.num_heads  作用是方便之后的运算
53
        qkv = self.qkv(x).reshape(B,N,3,self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
54
        # 用切片拿到QKV,形状是 B num_heads N C//self.num_heads
55
        q, k, v = qkv[0], qkv[1], qkv[2]
56
        # 计算qk的点积,并进行缩放得到注意力分数
57
        # Q: [3 B num_heads N C//self.num_heads] k.transpose(-2,-1)  K:[B num_heads C//self.num_heads N]
58
        attn = (q @ k.transpose(-2, -1)) * self.scale  # B num_heads N N
59
        attn = attn.softmax(dim=-1) # 对每行进行处理 使得每行的和为1
60
        # 注意力权重对V进行加权求和
61
        # attn @ v : B num_heads N C//self.num_heads
62
        # transpose: B N self.num_heads C//self.num_heads
63
        # reshape将最后两个维度拼接,合并多个头的输出,回到总的嵌入维度
64
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
65
        x = self.proj(x)
66
        x = self.proj_drop(x)
67
68
        return x
69
70
71
72
73
class Mlp(nn.Module):
74
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
75
        # in_features输入的维度, hidden_features隐藏层维度、通常为in_features的4倍, out_features输出维度、通常与输入维度相等
76
        super().__init__()
77
        out_features = out_features or in_features
78
        hidden_features = hidden_features or in_features
79
        self.fc1 = nn.Linear(in_features, hidden_features)
80
        self.act = act_layer
81
        self.fc2 = nn.Linear(hidden_features, out_features)
82
        self.drop = nn.Dropout(drop)
83
84
    def forward(self, x):
85
        x = self.fc1(x)
86
        x = self.act(x)
87
        x = self.drop(x)
88
        x = self.fc2(x)
89
        x = self.drop(x)
90
        return x
91
92
93
94
95
96
class Block(nn.Module):
97
    # mlp_ratio 计算hidden_features大小 默认为输入4倍   norm_layer正则化层
98
    # drop_path_ratio 是drop_path的比率,该操作在残差连接之前  drop_ratio 是多头自注意力机制最后的linear后使用的dropout
99
100
    def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, qk_scale=None, drop_ratio=0.,
101
                 attn_drop_ratio=0., drop_path_ratio=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm):
102
        super(Block, self).__init__()
103
        self.norm1 = norm_layer(dim)  # transformer encoder block中的第一个layer norm
104
        # 实例化多头注意力机制
105
        self.attn = Attention(dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale,
106
                              atte_drop_ration=attn_drop_ratio, proj_drop_ration=drop_path_ratio)
107
        self.drop_path = DropPath(drop_path_ratio) if drop_path_ratio > 0 else nn.Identity()
108
        self.norm2 = norm_layer(dim)
109
        mlp_hidden_dim = int(dim * mlp_ratio)  # 计算MLP第一个全连接层的节点数
110
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop_ratio)
111
112
    def forward(self, x):
113
        x = x + self.drop_path(self.attn(self.norm1(x)))
114
        x = x + self.drop_path(self.mlp(self.norm2(x)))
115
        return x
116
117
118
119
class VisionTransformer(nn.Module):
120
    def __init__(self, img_size=224, patch_size=16, in_c=3, num_classes=1000,
121
                 embed_dim=768, depth=12, num_heads=12,mlp_ratio=4., qkv_bias=True, qk_scale=None,
122
                 representation_size=None, distilled=False, drop_ratio=0.,
123
                 attn_drop_ratio=0., drop_path_ratio=0. , embed_layer=PatchEmbed ,norm_layer=None,
124
                 act_layer=None):
125
        super(VisionTransformer, self).__init__()
126
        self.num_classes = num_classes
127
        self.num_features = self.embed_dim = embed_dim
128
        self.num_tokens = 2 if distilled else 1
129
        # 设置一个较小的参数防止除0
130
        norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
131
        act_layer = act_layer or nn.GELU()
132
        self.patch_embed = embed_layer(img_size, patch_size, in_c, embed_dim, norm_layer)
133
        num_patches = self.patch_embed.num_patches  # 得到patches的个数
134
        # 使用nn.Parameter构建可训练的参数,用零矩阵初始化,第一个为batch,后两个为1*768
135
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
136
        self.dist_token = nn.Parameter(torch.zeros(1, 1, embed_dim)) if distilled else None
137
        # pos_embed 大小与concat拼接后的大小一致,是197*768
138
        self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + self.num_tokens, embed_dim))
139
        self.pos_drop = nn.Dropout(drop_ratio)
140
        # 根据传入的drop_path_ratio 构建等差序列,从0到drop_path_ratio,有depth个元素
141
        dpr = [x.item() for x in torch.linspace(0, drop_path_ratio, depth)]
142
        # 使用nn.Sequential将列表中的所有模块打包为一个整体 depth对应的是使用了transformer encoder block的数量
143
        self.block = nn.Sequential(*[
144
            Block(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale,
145
                  drop_ratio=drop_ratio, attn_drop_ratio=attn_drop_ratio,drop_path_ratio=dpr[i],
146
                  norm_layer=norm_layer, act_layer=act_layer)
147
            for i in range(depth)
148
        ])
149
        self.norm = norm_layer(embed_dim)  # 通过transformer后的layer norm
150
        '''
151
            这段代码中logits层是作为模型最后一层的原始输出值(一般是全连接层,尚未经过归一化),一般需要通过激活函数得到统计概率作为最终输出
152
            这里的representation size指的是你想要的输出数据的尺寸大小  在小规模的ViT中不需要该参数
153
        '''
154
        if representation_size and not distilled:
155
            self.has_logits = True
156
            self.num_features = representation_size
157
            self.pre_logits = nn.Sequential(OrderedDict([
158
                ("fc", nn.Linear(embed_dim, representation_size)),
159
                ("act", nn.Tanh())
160
            ]))
161
        else:
162
            self.has_logits = False
163
            self.pre_logits = nn.Identity() # 不做任何处理
164
        # 分类头
165
        self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity()
166
        self.head_dist = None
167
        if distilled:
168
            self.head_dist = nn.Linear(self.embed_dim, self.num_classes) if num_classes > 0 else nn.Identity()
169
        # 权重初始化
170
        nn.init.trunc_normal_(self.pos_embed, std=0.02)
171
        if self.dist_token is not None:
172
            nn.init.trunc_normal_(self.dist_token, std=0.02)
173
174
        nn.init.trunc_normal_(self.cls_token, std=0.02)
175
        self.apply(_init_vit_weights)
176
177
    def forward_features(self, x):  # 针对patch embedding部分的forward
178
        # B C H W -> B num_patches embed_dim  196 * 768
179
        x = self.patch_embed(x)
180
        # 1, 1, 768 -> B, 1, 768
181
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)
182
        # dist_token存在, 则拼接dist_token和cls_token, 否则只拼接cls_token和输入的patch特征x
183
        if self.dist_token is None:
184
            x = torch.cat((cls_token, x), dim=1) # B 197 768
185
        else:
186
            x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1),x), dim=1)
187
188
        x = self.pos_drop(x+self.pos_embed)
189
        x = self.block(x)
190
        x = self.norm(x)
191
        if self.dist_token is None:
192
            return self.pre_logits(x[:, 0])  # dist_token为None,利用切片的形式获取cls_token对应的输出
193
        else:
194
            return x[:, 0], x[:, 1:]
195
196
    def forward(self, x):
197
        x = self.forward_features(x)
198
        if self.head_dist is not None:
199
            # 知识蒸馏相关知识
200
            x, x_dist = self.head(x[0]), self.head_dist(x[1])
201
            # 如果是训练模式且不是脚本模式
202
            if self.training and not torch.jit.is_scripting():
203
                # 则返回两个头部的预测结果
204
                return x, x_dist
205
        else:
206
            x = self.head(x) # 最后的linear全连接层
207
        return x
208
209
210
def _init_vit_weights(m):
211
    # 判断模块m是否为线形层
212
    if isinstance(m, nn.Linear):
213
        nn.init.trunc_normal_(m.weight, std=0.01)
214
        if m.bias is not None: # 如果线性层存在偏置项,则将偏置项初始化为0
215
            nn.init.zeros_(m.bias)
216
    elif isinstance(m, nn.Conv2d):
217
        nn.init.kaiming_normal_(m.weight, mode='fan_out') # 对卷积层的权重做一个初始化,适用于卷积
218
        if m.bias is not None:
219
            nn.init.zeros_(m.bias)
220
    elif isinstance(m, nn.LayerNorm):
221
        nn.init.zeros_(m.bias)
222
        nn.init.ones_(m.weight)  # 对层归一化的权重初始化为1
223
224
225
def vit_base_patch16_224(num_classes:int = 1000, pretrained=False):
226
    model = VisionTransformer(img_size=224, patch_size=16, embed_dim=768, depth=12, num_heads=12,
227
                              representation_size=None, num_classes=num_classes)
228
    return model
229
230
231