深度学习模型之CNN(十四)使用pytorch搭建MobileNetV2V3并基于迁移学习训练

MobileNet v2v3网络搭建

工程目录

1
2
3
4
5
6
7
8
9
10
├── Test6_mobilenet
├── model_v2.py(MobileNet v2模型文件)
├── model_v3.py(MobileNet v3模型文件)
├── train.py(调用模型训练,自动生成class_indices.json,MobileNetV2/V3.pth文件)
├── predict.py(调用模型进行预测)
├── tulip.jpg(用来根据前期的训练结果来predict图片类型)
├── mobilenet_v2.pth(用于迁移学习时,提前下载好官方的MobileNet v2权重脚本)
└── mobilenet_v3_large.pth(用于迁移学习时,提前下载好官方的mobilenet_v3_large权重脚本)
└── data_set
└── data数据集

model_v2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from torch import nn
import torch


def _make_divisible(ch, divisor=8, min_ch=None):
# ......


class ConvBNReLU(nn.Sequential):
def __init__(self, in_channel, out_channel, kernel_size=3, stride=1, groups=1):
# ......


class InvertedResidual(nn.Module):
def __init__(self, in_channel, out_channel, stride, expand_ratio):
# ......

def forward(self, x):
# ......


class MobileNetV2(nn.Module):
def __init__(self, num_classes=1000, alpha=1.0, round_nearest=8):
# ......

def forward(self, x):
# ......

网络结构参数

先通过普通卷积,之后进行一个Inverted residual结构(n:倒残差结构重复几遍),一直重复倒残差结构,再通过1x1的普通卷积操作,紧接着平均池化下采样,最后是通过1x1卷积得到最终输出(作用同全连接层)

搭建该网络重点在于搭建好Inverted residual结构

Mobilenetv2网络结构参数

ConvBNReLU类

ConvBNReLU类是包含conv、BN和ReLU6激活函数的组合层。在MNv2网络中,所有卷积层以及上节课所讲的DW卷积操作,基本上都是由卷积+BN+ReLU6激活函数组成的。

唯一不同的是在Inverted residual结构中的第三层,也就是使用1x1的卷积对特征矩阵进行降维处理,这里使用线性激活函数,其他地方就都是通过卷积+BN+ReLU6激活函数组成。

因此创建ConvBNReLU类,继承来自nn.Sequential父类(nn.Sequential不需要写forward函数,此处参照pytorch的官方实现)

1
2
3
4
5
6
7
8
class ConvBNReLU(nn.Sequential):
def __init__(self, in_channel, out_channel, kernel_size=3, stride=1, groups=1):
padding = (kernel_size - 1) // 2
super(ConvBNReLU, self).__init__(
nn.Conv2d(in_channel, out_channel, kernel_size, stride, padding, groups=groups, bias=False),
nn.BatchNorm2d(out_channel),
nn.ReLU6(inplace=True)
)

Inverted residual结构

类似于ResNet中的残差结构,只不过普通的ResNet的残差结构是两头粗中间细的结构,但倒残差结构是两头细中间粗的结构。

Inverted residual结构参数

第一层:普通卷积层,卷积核大小为1x1,激活函数是ReLU6,采用卷积核个数为tk(t是倍率因子,用来扩大特征矩阵深度);

第二层:DW卷积,卷积核大小为3x3,步距stride = s(传入参数),激活函数是ReLU6,卷积核个数 = 输入特征矩阵深度 = 输出特征矩阵深度,缩减高宽;

第三层:普通1x1卷积层,采用线性激活函数,卷积核个数为k‘(人为设定)

__init__函数

  • 初始化函数传入参数expand_ratio:拓展因子,即上图表中的t
  • 定义隐层hidden_channel:第一层卷积核的个数,即上图表中的tk
  • self.use_shortcut:是一个bool变量,判断在正向传播过程中是否采用shortcut(只有当stride = 1且输入特征矩阵的shape与输出特征矩阵的shape保持一致市才能使用sortcut)
  • 定义层列表layers:判断expand_ratio是否等于1(在v2网络结构参数表中的第一个bottleneck的第一层中,n = 1,s = 1,因此特征矩阵的shape都未发生变化,因此在搭建过程中进行跳过),当expand_ratio = 1时,跳过该处1x1卷积层。

假设此处expand_ratio!= 1,那么就需要在layers列表中添加一个卷积层,调用ConvBNReLU层结构(输入特征矩阵深度,hidden_channel,由上图(右表)得知第一层卷积核大小为1x1)。

  • 调用layers.extend函数:添加一系列层结构,实际上通过extend和append函数添加层结构是一样的,只不过extend函数能够实现一次性批量插入多个元素。

对于Inverted residual结构,第二层是DW卷积,因此此处调用定义好的ConvBNReLU层结构,输入特征矩阵深度为上一层的输出特征矩阵深度hidden_channel,输出深度同样为hidden_channel(因为DW卷积层的输入特征矩阵深度和输出特征矩阵深度是一样的)

stride是传入的参数,group默认为1时是普通卷积,当传入与输入特征矩阵深度相同的个数的话,就是DW卷积。而此处输入特征矩阵的深度是hidden_channel,因此此处groups=hidden_channel。

第三层是1x1的普通卷积,所采用的的激活函数是线性激活函数(y = x),不能使用定义好的ConvBNReLU,因此使用原始的Conv2d,输入特征矩阵深度为上一层输出特征矩阵深度,输出特征矩阵深度为传入参数out_channel,卷积核大小为1

最后使用Batch Normalization标准化处理。

注意:因为第三层使用线性激活函数y = x,等于不对输入做任何处理,因此不需要再额外定义一个激活层函数,因为不添加激活层函数,就等于linear激活函数

  • 最后通过nn.Sequential类将*layers参数传入进去(*用于将层列表解包为位置参数,这使我们能够像船体单个参数一样将层列表传递给Sequential模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class InvertedResidual(nn.Module):
def __init__(self, in_channel, out_channel, stride, expand_ratio):
super(InvertedResidual, self).__init__()
hidden_channel = in_channel * expand_ratio
self.use_shortcut = stride == 1 and in_channel == out_channel

layers = []
if expand_ratio != 1:
# 1x1 pointwise conv
layers.append(ConvBNReLU(in_channel, hidden_channel, kernel_size=1))
layers.extend([
# 3x3 depthwise conv
ConvBNReLU(hidden_channel, hidden_channel, stride=stride, groups=hidden_channel),
# 1x1 pointwise conv(linear)
nn.Conv2d(hidden_channel, out_channel, kernel_size=1, bias=False),
nn.BatchNorm2d(out_channel),
])

self.conv = nn.Sequential(*layers)

forward函数

x为输入特征矩阵

首先进行判断,是否使用shortcut,如果使用即返回特征矩阵与shortcut相加之后的输出特征矩阵;如果不使用shortcut,则直接返回主分支上的输出特征矩阵。

1
2
3
4
5
def forward(self, x):
if self.use_shortcut:
return x + self.conv(x)
else:
return self.conv(x)

_make_divisible函数

是为了将卷积核个数(输出的通道个数)调整为输入round_nearest参数的整数倍。搭建中采用round_nearest=8,也就是要将卷积核的个数设置为8的整数倍。

目的:为了更好的调用硬件设备,比如多GPU变形运算,或者多机器分布式运算

  • ch:传入的卷积核个数(输出特征矩阵的channel)
  • divisor:传入round_nearest基数,即将卷积核个数ch调整为divisor的整数倍
  • min_ch:最小通道数,如果为None,就将min_ch设置为divisor
  • new_ch:即将卷积核个数调整为离它最近的8的倍数的值
  • 之后进行判断new_ch是否小于传入ch的0.9倍,如果小于,则加上一个divisor(为了确保new_ch向下取整的时候,不会减少超过10%)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _make_divisible(ch, divisor=8, min_ch=None):
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
"""
if min_ch is None:
min_ch = divisor
new_ch = max(min_ch, int(ch + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_ch < 0.9 * ch:
new_ch += divisor
return new_ch

定义MNv2网络

  • block:将前面定义的InvertedResidual类传给block
  • input_channel:是第一层卷积层所采用的卷积核的个数,也就是下一层输出特征矩阵的深度,为8的倍数
  • last_channel:指的是参数表中1x1的卷积核,输出特征矩阵深度为1280
  • inverted_residual_setting:根据参数表创建list列表,列表中的元素对应着参数表中每一行的参数t(拓展因子)、c(输出channel)、n(倒残差结构重复次数)、s(每一个block第一层卷积层的步距)
  • features:在其中先添加第一个卷积层,输入channel即彩色图片

通过循环定义一系列的Inverted residual结构。将每一层output_channel通过_make_divisible函数进行调整,再进行n次的Inverted residual(在循环中除了列表中的stride = s,其他都是 = 1)

在经过一系列Inverted residual处理后,接下来的是一个1x1的卷积层,直接使用ConvBNReLU

  • self.features:通常将以上一系列层结构统称为特征提取层,所以Sequential将刚刚定义好的一系列层结构通过位置参数的形式传入进去,打包成一个整体。

  • 定义由self.avgpoolself.classifier组成的分类器

    首先定义平均池化下采样层(采用自适应的平均池化下采样操作,给定输出特征矩阵的高宽为1)

    再通过nn.Sequential将Dropout层和全连接层组合在一起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MobileNetV2(nn.Module):
def __init__(self, num_classes=1000, alpha=1.0, round_nearest=8):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = _make_divisible(32 * alpha, round_nearest)
last_channel = _make_divisible(1280 * alpha, round_nearest)

inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]

features = []
# conv1 layer
features.append(ConvBNReLU(3, input_channel, stride=2))
# building inverted residual residual blockes
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * alpha, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t))
input_channel = output_channel
# building last several layers
features.append(ConvBNReLU(input_channel, last_channel, 1))
# combine feature layers
self.features = nn.Sequential(*features)

# building classifier
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.classifier = nn.Sequential(
nn.Dropout(0.2),
nn.Linear(last_channel, num_classes)
)

权重初始化

  • 遍历每一个子模块,如果是卷积层,就对权重进行初始化,如果存在偏置,则将偏置设置为0;
  • 如果子模块是一个BN层,则将方差设置为1,均值设置为0;
  • 如果子模块是一个全连接层的话,对权重进行初始化,normal_函数为一个正态分布函数,将权重调整为均值为0.0,方差为0.01的正态分布;将偏置设置为0
1
2
3
4
5
6
7
8
9
10
11
12
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, nn.BatchNorm2d):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)

正向传播过程

首先将输入特征矩阵输入进特征提取部分,通过平均池化下采样得到输出,再对输出进行展平处理,最后通过分类器得到最终输出。

1
2
3
4
5
6
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x

model_v3.py

Mobilenet v3网络结构参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from typing import Callable, List, Optional

import torch
from torch import nn, Tensor
from torch.nn import functional as F
from functools import partial

#同MobileNet v2中的_make_divisible作用一致
def _make_divisible(ch, divisor=8, min_ch=None):
# ......


class ConvBNActivation(nn.Sequential):
def __init__(self, in_planes: int,
out_planes: int,
kernel_size: int = 3,
stride: int = 1,
groups: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None):
# ......


class SqueezeExcitation(nn.Module):
def __init__(self, input_c: int, squeeze_factor: int = 4):
# ......

def forward(self, x: Tensor) -> Tensor:
# ......


class InvertedResidualConfig:
def __init__(self, input_c: int,
kernel: int,
expanded_c: int,
out_c: int,
use_se: bool,
activation: str,
stride: int,
width_multi: float):
# ......

@staticmethod
def adjust_channels(channels: int, width_multi: float):
return _make_divisible(channels * width_multi, 8)


class InvertedResidual(nn.Module):
def __init__(self, cnf: InvertedResidualConfig, norm_layer: Callable[..., nn.Module]):
# ......

def forward(self, x: Tensor) -> Tensor:
# ......


class MobileNetV3(nn.Module):
def __init__(self,
inverted_residual_setting: List[InvertedResidualConfig],
last_channel: int,
num_classes: int = 1000,
block: Optional[Callable[..., nn.Module]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None):
# ......

def _forward_impl(self, x: Tensor) -> Tensor:
# ......

def forward(self, x: Tensor) -> Tensor:
# ......


def mobilenet_v3_large(num_classes: int = 1000,
reduced_tail: bool = False) -> MobileNetV3:
# ......


def mobilenet_v3_small(num_classes: int = 1000,
reduced_tail: bool = False) -> MobileNetV3:
# ......

_make_divisible函数

同MobileNet v2中的_make_divisible作用一致,是为了将卷积核个数(输出的通道个数)调整为输入round_nearest参数的整数倍。搭建中采用round_nearest=8,也就是要将卷积核的个数设置为8的整数倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def _make_divisible(ch, divisor=8, min_ch=None):
"""
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
"""
if min_ch is None:
min_ch = divisor
new_ch = max(min_ch, int(ch + divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_ch < 0.9 * ch:
new_ch += divisor
return new_ch

ConvBNActivation类

ConvBNActivation类是包含conv、BN和激活函数的组合层。

  • in_planes:输入特征矩阵的深度
  • out_planes:输出特征矩阵的深度,对应卷积核的个数
  • norm_layer:对应的在卷积后接的BN层
  • activation_layer:对应激活函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ConvBNActivation(nn.Sequential):
def __init__(self,
in_planes: int,
out_planes: int,
kernel_size: int = 3,
stride: int = 1,
groups: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
activation_layer: Optional[Callable[..., nn.Module]] = None):
padding = (kernel_size - 1) // 2
if norm_layer is None:
norm_layer = nn.BatchNorm2d
if activation_layer is None:
activation_layer = nn.ReLU6
super(ConvBNActivation, self).__init__(nn.Conv2d(in_channels=in_planes,
out_channels=out_planes,
kernel_size=kernel_size,
stride=stride,
padding=padding,
groups=groups,
bias=False),
norm_layer(out_planes),
activation_layer(inplace=True))

SqueezeExcitation模块

初始化函数

相当于两个全连接层。

  • 对于第一个全连接层,此处的节点个数 = 该处输入的特征矩阵channel的1/4(在v3原论文中作者有给出)
  • 第二层全连接层的节点个数 = 该处输入的特征矩阵channel
  • 第一个全连接层的激活函数是ReLU,第二个全连接层的激活函数是hard-sigmoid
SE注意力机制模块
  • squeeze_factor:因为第一个全连接层的节点个数是输入的特征矩阵channel的1/4,所以这里存放的是分母4;
  • squeeze_c:调用_make_divisible方法调整到离该数最近的8的整数倍的数字;
  • self.fc1:使用卷积来作为全连接层,作用与全连接层一样
  • self.fc2:输入channel是上一层的输出channel,所以是squeeze_c,又因为SE机制最终输出需要和输入特征矩阵的channel保持一致,所以输出channel为input_c
1
2
3
4
5
6
class SqueezeExcitation(nn.Module):
def __init__(self, input_c: int, squeeze_factor: int = 4):
super(SqueezeExcitation, self).__init__()
squeeze_c = _make_divisible(input_c // squeeze_factor, 8)
self.fc1 = nn.Conv2d(input_c, squeeze_c, 1)
self.fc2 = nn.Conv2d(squeeze_c, input_c, 1)

正向传播函数

如上图(下)第三个特征矩阵需要进行池化操作,因此进行自适应池化下采样操作,并且返回特征矩阵的高宽是1x1。再将拿到的输出通过全连接层,relu激活函数,第二层全连接层,和一个h-sigmoid激活函数,得到输出。这里的输出就是第二层全连接层的输出,也就是得到了不同channel对应的权重

接下来需要将输出与原特征矩阵相乘,得到通过SE模块之后的输出。

1
2
3
4
5
6
7
def forward(self, x: Tensor) -> Tensor: # 返回Tensor结构
scale = F.adaptive_avg_pool2d(x, output_size=(1, 1))
scale = self.fc1(scale)
scale = F.relu(scale, inplace=True)
scale = self.fc2(scale)
scale = F.hardsigmoid(scale, inplace=True)
return scale * x

InvertedResidualConfig类

InvertedResidualConfig对应的是MobileNetV3中的每一个bneck结构的参数配置,其中有

  • input_c:输入特征矩阵的大小(主要指channel);
  • kernel:每一层使用的kernel_size(即DW卷积中的卷积核大小);
  • expanded_c:第一层卷积层所采用的的卷积核的个数;
  • out_c:最后一层1x1的卷积层输出得到特征矩阵的channel;
  • use_se:是否使用SE注意力机制;
  • activation:采用的激活函数,RE表示采用ReLU激活函数,HS表示采用H-Swish激活函数;
  • stride:指的是DW卷积所对应的步距;
  • width_multi:就是MNv2中提到的$\alpha$参数,用来调节每一个卷积层所使用channel的倍率因子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class InvertedResidualConfig:
def __init__(self,
input_c: int,
kernel: int,
expanded_c: int,
out_c: int,
use_se: bool,
activation: str,
stride: int,
width_multi: float):
self.input_c = self.adjust_channels(input_c, width_multi)
self.kernel = kernel
self.expanded_c = self.adjust_channels(expanded_c, width_multi)
self.out_c = self.adjust_channels(out_c, width_multi)
self.use_se = use_se
self.use_hs = activation == "HS" # whether using h-swish activation
self.stride = stride

adjust_channels函数

是一个静态方法,其实也是直接调用了_make_divisible方法,传入参数为channel和$\alpha$倍率因子,最终得到channel和$\alpha$的乘积离8最近的整数倍的值。

1
2
3
@staticmethod
def adjust_channels(channels: int, width_multi: float):
return _make_divisible(channels * width_multi, 8)

InvertedResidual类

初始化函数

  • cnf:前文提到的InvertedResidualConfig配置文件;
  • norm_layer:对应的在卷积后接的BN层
  • cnf.stride:判断步距是否为1或2,因为在网络参数表中,步距只有1和2两种情况,当出现第三种情况时,就是不合法的步距情况;再判断
  • self.use_res_connect:是否使用shortcut连接,shortcut只有在stride == 1且input_c == output_c时才有;
  • activation_layer:判断使用ReLU或者H-Swish激活函数(官方是在1.7及以上版本中才有官方实现的H-Swish和H-Sigmoid激活函数,如果需要使用MNv3网络的话,得把pytorch版本更新至1.7及以上)
  • expand区域指在InvertedResidual结构中的第一个1x1卷积层进行升维处理,因为第一个kneck存在输入特征矩阵的channel和输出特征矩阵的channel相等,因此可以跳过,所以会进行判断cnf.expanded_c != cnf.input_c;
  • depthwise区域为dw卷积区域

groups:由于DW卷积是针对每一个channel都单独使用一个channel为1的卷积核来进行卷及处理,所以groups和channel的个数是保持一致的,所以groups=cnf.expanded_c

  • project区域是InvertedResidual结构中1x1卷积中的降维部分,activation_layer=nn.Identity中的Identity其实就是线性y = x,没有做任何处理;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class InvertedResidual(nn.Module):
def __init__(self,
cnf: InvertedResidualConfig,
norm_layer: Callable[..., nn.Module]):
super(InvertedResidual, self).__init__()

if cnf.stride not in [1, 2]:
raise ValueError("illegal stride value.")

self.use_res_connect = (cnf.stride == 1 and cnf.input_c == cnf.out_c)

layers: List[nn.Module] = []
activation_layer = nn.Hardswish if cnf.use_hs else nn.ReLU

# expand
if cnf.expanded_c != cnf.input_c:
layers.append(ConvBNActivation(cnf.input_c,
cnf.expanded_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=activation_layer))

# depthwise
layers.append(ConvBNActivation(cnf.expanded_c,
cnf.expanded_c,
kernel_size=cnf.kernel,
stride=cnf.stride,
groups=cnf.expanded_c,
norm_layer=norm_layer,
activation_layer=activation_layer))

if cnf.use_se:
layers.append(SqueezeExcitation(cnf.expanded_c))

# project
layers.append(ConvBNActivation(cnf.expanded_c,
cnf.out_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Identity))

self.block = nn.Sequential(*layers)
self.out_channels = cnf.out_c

正向传播函数

将特征矩阵传入block方法得到主分支输出特征矩阵,再经过判断是否有shortcut,如果有则相加,无则直接输出主分支特征矩阵。

1
2
3
4
5
def forward(self, x: Tensor) -> Tensor:
result = self.block(x)
if self.use_res_connect:
result += x
return result

定义MNv3类

初始化函数

  • inverted_residual_setting:对应一系列bneck结构参数的列表;
  • last_channel:对应是MNv3网络参数表中倒数第二个全连接层的输出节点的个数;
  • block:就是之前定义的InvertedResidual模块;
  • norm_layer:对应的在卷积后接的BN层;

当inverted_residual_setting为空或者不是一个list的话都会报错。

building first layer创建第一层conv2d

firstconv_output_c获取第一个bneck结构的输入特征矩阵的channel,所对应的是第一个卷积层输出的channel;

ConvBNActivation对应的是第一个conv2d,无论是v3-Large还是v3-Small,第一个都是3x3的卷积层。所以先创建了一个3x3的卷积层。

building inverted residual blocks创建block模块

遍历每一个bneck结构,将每一层的配置文件和norm_layer都传给block,将创建好的每一个block结构,也就是InvertedResidual模块给填进layers当中。

building last several layers创建平均池化下采样层和几个卷积层

lastconv_input_c:最后一个bneck模块的输出特征矩阵channel;

lastconv_output_c:无论是v3-Large还是v3-Small,lastconv_output_c都是lastconv_input_c的6倍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class MobileNetV3(nn.Module):
def __init__(self,
inverted_residual_setting: List[InvertedResidualConfig],
last_channel: int,
num_classes: int = 1000,
block: Optional[Callable[..., nn.Module]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None):
super(MobileNetV3, self).__init__()

if not inverted_residual_setting:
raise ValueError("The inverted_residual_setting should not be empty.")
elif not (isinstance(inverted_residual_setting, List) and
all([isinstance(s, InvertedResidualConfig) for s in inverted_residual_setting])):
raise TypeError("The inverted_residual_setting should be List[InvertedResidualConfig]")

if block is None:
block = InvertedResidual

if norm_layer is None:
norm_layer = partial(nn.BatchNorm2d, eps=0.001, momentum=0.01)

layers: List[nn.Module] = []

# building first layer
firstconv_output_c = inverted_residual_setting[0].input_c
layers.append(ConvBNActivation(3,
firstconv_output_c,
kernel_size=3,
stride=2,
norm_layer=norm_layer,
activation_layer=nn.Hardswish))
# building inverted residual blocks
for cnf in inverted_residual_setting:
layers.append(block(cnf, norm_layer))

# building last several layers
lastconv_input_c = inverted_residual_setting[-1].out_c
lastconv_output_c = 6 * lastconv_input_c
layers.append(ConvBNActivation(lastconv_input_c,
lastconv_output_c,
kernel_size=1,
norm_layer=norm_layer,
activation_layer=nn.Hardswish))
self.features = nn.Sequential(*layers)
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.classifier = nn.Sequential(nn.Linear(lastconv_output_c, last_channel),
nn.Hardswish(inplace=True),
nn.Dropout(p=0.2, inplace=True),
nn.Linear(last_channel, num_classes))

# initial weights
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode="fan_out")
if m.bias is not None:
nn.init.zeros_(m.bias)
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
nn.init.ones_(m.weight)
nn.init.zeros_(m.bias)
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
nn.init.zeros_(m.bias)

正向传播函数

将输入特征矩阵依次通过特征提取、平均池化、展平和classifier处理得到输出结果

1
2
3
4
5
6
7
8
9
def _forward_impl(self, x: Tensor) -> Tensor:
x = self.features(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.classifier(x)
return x

def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)

定义mobilenet_v3_large

  • width_multi:$\alpha$超参数,用来调整channel;
  • bneck_conf:同样使用partial方法,给InvertedResidualConfig方法传入默认参数width_multi,即$\alpha$超参数;
  • adjust_channels:即InvertedResidualConfig类中的adjust_channels方法,使用partial方法,给InvertedResidualConfig.adjust_channels方法传入默认参数width_multi,即$\alpha$超参数;
  • reduce_divider:对应最后三层bneck结构,对卷积的channel进行了调整,这里默认为False,指不做调整,如果设置为True,可以减少一些参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def mobilenet_v3_large(num_classes: int = 1000,
reduced_tail: bool = False) -> MobileNetV3:
"""
Constructs a large MobileNetV3 architecture from
"Searching for MobileNetV3" <https://arxiv.org/abs/1905.02244>.
weights_link:
https://download.pytorch.org/models/mobilenet_v3_large-8738ca79.pth
Args:
num_classes (int): number of classes
reduced_tail (bool): If True, reduces the channel counts of all feature layers
between C4 and C5 by 2. It is used to reduce the channel redundancy in the
backbone for Detection and Segmentation.
"""
width_multi = 1.0
bneck_conf = partial(InvertedResidualConfig, width_multi=width_multi)
adjust_channels = partial(InvertedResidualConfig.adjust_channels, width_multi=width_multi)

reduce_divider = 2 if reduced_tail else 1

inverted_residual_setting = [
# input_c, kernel, expanded_c, out_c, use_se, activation, stride
bneck_conf(16, 3, 16, 16, False, "RE", 1),
bneck_conf(16, 3, 64, 24, False, "RE", 2), # C1
bneck_conf(24, 3, 72, 24, False, "RE", 1),
bneck_conf(24, 5, 72, 40, True, "RE", 2), # C2
bneck_conf(40, 5, 120, 40, True, "RE", 1),
bneck_conf(40, 5, 120, 40, True, "RE", 1),
bneck_conf(40, 3, 240, 80, False, "HS", 2), # C3
bneck_conf(80, 3, 200, 80, False, "HS", 1),
bneck_conf(80, 3, 184, 80, False, "HS", 1),
bneck_conf(80, 3, 184, 80, False, "HS", 1),
bneck_conf(80, 3, 480, 112, True, "HS", 1),
bneck_conf(112, 3, 672, 112, True, "HS", 1),
bneck_conf(112, 5, 672, 160 // reduce_divider, True, "HS", 2), # C4
bneck_conf(160 // reduce_divider, 5, 960 // reduce_divider, 160 // reduce_divider, True, "HS", 1),
bneck_conf(160 // reduce_divider, 5, 960 // reduce_divider, 160 // reduce_divider, True, "HS", 1),
]
last_channel = adjust_channels(1280 // reduce_divider) # C5

return MobileNetV3(inverted_residual_setting=inverted_residual_setting,
last_channel=last_channel,
num_classes=num_classes)

定义mobilenet_v3_small

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def mobilenet_v3_small(num_classes: int = 1000,
reduced_tail: bool = False) -> MobileNetV3:
"""
Constructs a large MobileNetV3 architecture from
"Searching for MobileNetV3" <https://arxiv.org/abs/1905.02244>.
weights_link:
https://download.pytorch.org/models/mobilenet_v3_small-047dcff4.pth
Args:
num_classes (int): number of classes
reduced_tail (bool): If True, reduces the channel counts of all feature layers
between C4 and C5 by 2. It is used to reduce the channel redundancy in the
backbone for Detection and Segmentation.
"""
width_multi = 1.0
bneck_conf = partial(InvertedResidualConfig, width_multi=width_multi)
adjust_channels = partial(InvertedResidualConfig.adjust_channels, width_multi=width_multi)

reduce_divider = 2 if reduced_tail else 1

inverted_residual_setting = [
# input_c, kernel, expanded_c, out_c, use_se, activation, stride
bneck_conf(16, 3, 16, 16, True, "RE", 2), # C1
bneck_conf(16, 3, 72, 24, False, "RE", 2), # C2
bneck_conf(24, 3, 88, 24, False, "RE", 1),
bneck_conf(24, 5, 96, 40, True, "HS", 2), # C3
bneck_conf(40, 5, 240, 40, True, "HS", 1),
bneck_conf(40, 5, 240, 40, True, "HS", 1),
bneck_conf(40, 5, 120, 48, True, "HS", 1),
bneck_conf(48, 5, 144, 48, True, "HS", 1),
bneck_conf(48, 5, 288, 96 // reduce_divider, True, "HS", 2), # C4
bneck_conf(96 // reduce_divider, 5, 576 // reduce_divider, 96 // reduce_divider, True, "HS", 1),
bneck_conf(96 // reduce_divider, 5, 576 // reduce_divider, 96 // reduce_divider, True, "HS", 1)
]
last_channel = adjust_channels(1024 // reduce_divider) # C5

return MobileNetV3(inverted_residual_setting=inverted_residual_setting,
last_channel=last_channel,
num_classes=num_classes)

train.py

下载官方权重文件:输入import torchvision.model.mobilenet,ctrl+左键进入函数当中,会有显示下载链接

该训练脚本与前期使用的AlexNet、VGG、GooglNet以及ResNet所使用的训练脚本基本一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import sys
import json

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm
# 训练MobileNetv2
from model_v2 import MobileNetV2
# 训练MobileNetv3-Large
from model_v3 import mobilenet_v3_large

def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("using {} device.".format(device))

batch_size = 16
epochs = 5

data_transform = {
"train": transforms.Compose([transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
"val": transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

data_root = os.path.abspath(os.path.join(os.getcwd(), "..")) # get data root path
image_path = os.path.join(data_root, "data_set", "flower_data") # flower data set path
assert os.path.exists(image_path), "{} path does not exist.".format(image_path)
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, "train"),
transform=data_transform["train"])
train_num = len(train_dataset)

# {'daisy':0, 'dandelion':1, 'roses':2, 'sunflower':3, 'tulips':4}
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
# write dict into json file
json_str = json.dumps(cla_dict, indent=4)
with open('class_indices.json', 'w') as json_file:
json_file.write(json_str)

nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
print('Using {} dataloader workers every process'.format(nw))

train_loader = torch.utils.data.DataLoader(train_dataset,
batch_size=batch_size, shuffle=True,
num_workers=nw)

validate_dataset = datasets.ImageFolder(root=os.path.join(image_path, "val"),
transform=data_transform["val"])
val_num = len(validate_dataset)
validate_loader = torch.utils.data.DataLoader(validate_dataset,
batch_size=batch_size, shuffle=False,
num_workers=nw)

print("using {} images for training, {} images for validation.".format(train_num,
val_num))

# create model
# 训练MobileNetv2
net = MobileNetV2(num_classes=5)
# 训练MobileNetv3-Large
net = mobilenet_v3_large(num_classes=5)

# load pretrain weights
# download url: https://download.pytorch.org/models/mobilenet_v2-b0353104.pth
model_weight_path = "./mobilenet_v2.pth" #"./mobilenet_v3_large.pth"
assert os.path.exists(model_weight_path), "file {} dose not exist.".format(model_weight_path)
pre_weights = torch.load(model_weight_path, map_location='cpu')

# delete classifier weights
pre_dict = {k: v for k, v in pre_weights.items() if net.state_dict()[k].numel() == v.numel()}
missing_keys, unexpected_keys = net.load_state_dict(pre_dict, strict=False)

# freeze features weights
for param in net.features.parameters():
param.requires_grad = False

net.to(device)

# define loss function
loss_function = nn.CrossEntropyLoss()

# construct an optimizer
params = [p for p in net.parameters() if p.requires_grad]
optimizer = optim.Adam(params, lr=0.0001)

best_acc = 0.0
save_path = './MobileNetV2.pth' # save_path = './MobileNetV3.pth'
train_steps = len(train_loader)
for epoch in range(epochs):
# train
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad()
logits = net(images.to(device))
loss = loss_function(logits, labels.to(device))
loss.backward()
optimizer.step()

# print statistics
running_loss += loss.item()

train_bar.desc = "train epoch[{}/{}] loss:{:.3f}".format(epoch + 1,
epochs,
loss)

# validate
net.eval()
acc = 0.0 # accumulate accurate number / epoch
with torch.no_grad():
val_bar = tqdm(validate_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
outputs = net(val_images.to(device))
# loss = loss_function(outputs, test_labels)
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()

val_bar.desc = "valid epoch[{}/{}]".format(epoch + 1,
epochs)
val_accurate = acc / val_num
print('[epoch %d] train_loss: %.3f val_accuracy: %.3f' %
(epoch + 1, running_loss / train_steps, val_accurate))

if val_accurate > best_acc:
best_acc = val_accurate
torch.save(net.state_dict(), save_path)

print('Finished Training')


if __name__ == '__main__':
main()

**需要注意的不同点:**模型权重加载部分

首先实例化模型,将类别个数设置为5;

通过torch.load函数载入预训练参数,载入进之后是一个字典类型。因为官方是在ImageNet数据集上进行预训练,所以最后一层全连接层的节点个数 = 1000,而我们这最后一层节点个数 = 5,所以最后一层不能用。

因此首先遍历权重字典,查找权重名称中是否含有classifier参数,如果有这个参数,说明是最后一层全连接层的参数。如果没有classifier,则直接保存进pre_dict字典变量当中。

再通过net.load_state_dict函数将不包含classifier全连接层的权重字典全部载入进去。

之后冻结特征提取部分的所有权重,通过遍历net.features下的所有参数,将参数的requires_grad全部设置为False,这样就不会对其进行求导及参数更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# create model
net = MobileNetV2(num_classes=5)

# load pretrain weights
# download url: https://download.pytorch.org/models/mobilenet_v2-b0353104.pth
model_weight_path = "./mobilenet_v2.pth"
assert os.path.exists(model_weight_path), "file {} dose not exist.".format(model_weight_path)
pre_weights = torch.load(model_weight_path, map_location='cpu')

# delete classifier weights
pre_dict = {k: v for k, v in pre_weights.items() if "classifier" not in k}
missing_keys, unexpected_keys = net.load_state_dict(pre_dict, strict=False)

# freeze features weights
for param in net.features.parameters():
param.requires_grad = False

MNv2训练结果

MNv2训练结果

MNv3-Large训练结果

MNv3训练结果

predict.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import json

import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
# MNv2
from model_v2 import MobileNetV2
# MNv3-Large
from model_v3 import mobilenet_v3_large

def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

data_transform = transforms.Compose(
[transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

# load image
img_path = "tulip.jpg"
assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path)
img = Image.open(img_path)
plt.imshow(img)
# [N, C, H, W]
img = data_transform(img)
# expand batch dimension
img = torch.unsqueeze(img, dim=0)

# read class_indict
json_path = './class_indices.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)

with open(json_path, "r") as f:
class_indict = json.load(f)

# create model
# MNv2
model = MobileNetV2(num_classes=5).to(device)
# MNv3-Large
model = mobilenet_v3_large(num_classes=5).to(device)
# load model weights
model_weight_path = "./MobileNetV2.pth" # model_weight_path = "./MobileNetV3.pth"
model.load_state_dict(torch.load(model_weight_path, map_location=device))
model.eval()
with torch.no_grad():
# predict class
output = torch.squeeze(model(img.to(device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()

print_res = "class: {} prob: {:.3}".format(class_indict[str(predict_cla)],
predict[predict_cla].numpy())
plt.title(print_res)
for i in range(len(predict)):
print("class: {:10} prob: {:.3}".format(class_indict[str(i)],
predict[i].numpy()))
plt.show()


if __name__ == '__main__':
main()

MNv2预测结果

MNv2预测结果

MNv3-Large预测结果

MNv3预测结果