0%

生成对抗网络(GAN)是一类非常有趣的神经网络。借助GAN,计算机能够生成逼真的图片。近年来有许多“AI绘画”的新闻,这些应用大多是通过GAN实现的。实际上,GAN不仅能做图像生成,还能辅助其他输入信息不足的视觉任务。比如SRGAN,就是把GAN应用在超分辨率(SR)任务上的代表之作。

在这篇文章中,我将主要面向深度学习的初学者,介绍SRGAN[1]这篇论文,同时分享以下知识:

  • GAN的原理与训练过程
  • 感知误差(Perceptual Loss)
  • 基于的GAN的SR模型框架

讲完了知识后,我还会解读一下MMEditing的SRGAN的训练代码。看懂这份代码能够加深对SRGAN训练算法的理解。

SRGAN 核心思想

早期超分辨率方法的优化目标都是降低低清图像和高清图像之间的均方误差。降低均方误差,确实让增强图像和原高清图像的相似度更高。但是,图像的相似度指标高并不能代表图像的增强质量就很高。下图显示了插值、优化均方误差、SRGAN、原图这四个图像输出结果(括号里的相似度指标是PSNR和SSIM)。

从图中可以看出,优化均方误差虽然能让相似度指标升高,但图像的细节十分模糊,尤其是纹理比较密集的高频区域。相比之下,SRGAN增强出来的图像虽然相似度不高,但看起来更加清晰。

为什么SRGAN的增强结果那么清楚呢?这是因为SRGAN使用了一套新的优化目标。SRGAN使用的损失函数既包括了GAN误差,也包括了感知误差。这套新的优化目标能够让网络生成看起来更清楚的图片,而不仅仅是和原高清图像相似度更高的图片。

下面,我们来一步一步学习SRGAN的框架。

GAN 的原理

GAN[2]是一套搭建神经网络的框架。给定一个图片数据集$p_g$,GAN的目的是训练出一个生成网络$G$,使得G能够凭空生成出和$p_g$中大多数图片都类似的图片。比如说$p_g$是一个小猫图片数据集,那么$G$就应该能凭空生成出小猫图片。当然,$G$不是真的没有任何输入,真的能够凭空生成一幅图片。为了生成出不一样的图片,$G$要求输入一个随机量,这个随机量叫做噪声$z$。这样,只要输入的噪声$z$变了,$G$的输出$G(z)$就变了,就能画出长相不一样的小猫了。

为了指导图像生成,$G$应该有一个“老师”告诉它该怎么画出更像的图片。这个“老师”叫做判别网络$D$。$D$就是一个二分类网络,它能够严格地判定出一幅图片是否来自数据集$p_g$。如果$p_g$是一个小猫数据集,那么$D$就应该能判定一张图片是不是小猫。这样,如果$G$生成出来的图片$G(z)$已经非常逼真,连$D$都觉得$G(z)$来自数据集$p_g$,那么$G$就是一个很成功的网络了。

如果只是生成小猫,我们直接拿小猫图片和其他图片就能训练出一个$D$了。问题是,大多数情况下我们只有数据集$p_g$,而难以获得一个$p_g$的反例数据集。GAN的想法,则巧妙地解决了这个问题:刚开始,$G$生成出来的图片肯定是很差的,这些图片肯定不像$p_g$。所以,我们以$G(z)$为反例,和$p_g$一起训练出一个$D$来。等$D$的判定能力强了以后,又拿$D$回头训练$G$。这样,$D$的审美水平逐渐提高,$G$的绘画能力也逐渐提高。最终,$D$能成功分辨出一幅图片是否来自$p_g$,而$G$生成出来的图片和$p_g$中的看起来完全相同,连$D$也分辨不出来。就这样,我们得到了一个很棒的生成网络$G$。

规范地来说,给定一个数据集$p_g$,我们希望训练出两个网络$D, G$。$D$能够判断一幅输入图片是否来自$p_g$:

$G$则能够根据来自噪声分布$p_z$的$z$生成一个真假难辨的图片$G(z)$,使得$D(G(z))=1$。

为了达到这个目标,二分类器$D$应该最小化这样一个的交叉熵误差:

其中,$\hat{y}=D(x)$是预测结果为真的概率,$y$是0或1的标签。

对于来自数据集的图片$x \sim p_g$,$D$使用的标签$y$应该是1,误差公式化简为:

对于$G$生成的图片$G(z)$,$D$使用的标签$y$应该是0,误差公式化简为:

我们每步拿一张真图$x$和一张假图$G(z)$训练$D$。这样,每步的误差公式就是上面两个式子加起来:

反过来,$G$应该和$D$对抗,最大化上面那个误差,想办法骗过$D$。这个“对抗”就是GAN的名称“生成对抗网络”的由来。但是,$G$不能改变$D(x)$那一项。因此,$G$使用的误差函数是:

使用上面这两种误差,就可以训练神经网络了。训练GAN时,每轮一般会训练$k(k>=1)$次$D$,再训练1次$G$。这是为了先得到一个好的判别器,再用判别器去指导生成器。

GAN只是一套通用的框架,并没有指定神经网络$D, G$的具体结构。在不同任务中,$D, G$一般有不同的结构。

基于GAN的超分辨率网络

如前文所述,以优化均方误差为目标的超分辨率模型难以复原图像的细节。其实,超分辨率任务和图像生成任务类似,都需要一个“老师”来指导优化目标。SRGAN把GAN框架运用到了超分辨率任务上。原来的生成器$G$随机生成图像,现在用来输出高清图像;原来的判定器$D$用来判定图像是否属于某数据集,现在$D$用来判断一幅图像是否是高清图像。

具体来说,相比基础的GAN,在SRGAN中,$D$的输入是高清图像$I^{HR}$。而$G$的输入从随机噪声$z$变成了高清图像退化后的低清图像$I^{LR}$。这样,$G$就不是在随机生成图像,而是在根据一幅低清图像生成一幅高清图像了。它们的误差函数分别是:

借助GAN的架构,SRGAN能够利用$D$指导高清图像生成。但是,超分辨率任务毕竟和图像生成任务有一些区别,不能只用这种对抗误差来约束网络。因此,除了使用对抗误差外,SRGAN还使用了一种内容误差。这种内容误差用于让低清图片和高清图片的内容对齐,起到了和原均方误差一样的作用。

基于感知的内容误差

在介绍SRGAN的内容误差之前,需要对“内容误差”和“感知误差”这两个名词做一个澄清。在SRGAN的原文章中,作者把内容误差和对抗误差之和叫做感知误差。但是,后续的大部分文献只把这种内容误差叫做感知误差,不会把内容误差和对抗误差放在一起称呼。在后文中,我也会用“感知误差”来指代SRGAN中的“内容误差”。

在深度卷积神经网络(CNN)火起来后,人们开始研究为什么CNN能够和人类一样识别出图像。经实验,人们发现两幅图像经VGG(一个经典的CNN)的某些中间层的输出越相似,两幅图像从观感上也越相似。这种相似度并不是基于某种数学指标,而是和人的感知非常类似。

VGG的这种“感知性”被运用在了风格迁移等任务上。也有人考虑把这种感知上的误差运用到超分辨率任务上,并取得了不错的结果[3]。下图是真值、插值、基于逐像素误差、基于感知误差的四个超分辨率结果。

SRGAN也使用了这种感知误差,以取代之前常常使用的逐像素均方误差。这种感知误差的计算方法如下:VGG有很多中间层,用于计算感知误差的中间层$i$是可调的。假如我们用$\phi_{i}(I)$表示图像$I$经VGG的第$i$层的中间输出结果,$\phi_{i}(I)_{x, y}$表示中间输出结果在坐标$(x, y)$处的值,则感知误差的公式如下:

直观上解释这个公式,就是先把高清图像$I^{HR}$送入VGG,再把高清图像退化出来的低清图像$I^{LR}$送入生成器,并把生成器的输出$G(I^{LR})$也送入VGG。两幅图片经VGG第$i$层生成的中间结果的逐像素均方误差,就是感知误差。

算上之前的对抗误差,一个图像超分辨率网络的总误差如下:

这里的$w$用于调整两个误差的相对权重,原论文使用$w=10^{-3}$。

SRGAN的其他模块

定义好了误差函数,只要在决定好网络结构就可以开始训练网络了。SRGAN使用的生成网络和判别网络的结构如下:

判别网络就是一个平平无奇的二分类网络,架构上没有什么创新。而生成网络则先用几个残差块提取特征,最后用一种超分辨率任务中常用的上采样模块PixelShuffle对原图像的尺寸翻倍两次,最后输出一个边长放大4倍的高清图像。

SRGAN的这种网络结构在当时确实取得了不错的结果。但是,很快就有后续研究提出了更好的网络架构。比如ESRGAN[4]去掉了生成网络的BN层,提出了一种叫做RRDB的高级模块。基于RRDB的生成网络有着更好的生成效果。

不仅是网络架构,SRGAN的其他细节也得到了后续研究的改进。GAN误差的公式、总误差的公式、高清图像退化成低清图像的数据增强算法……这些子模块都被后续研究改进了。但是,SRGAN这种基于GAN的训练架构一直没有发生改变。有了SRGAN的代码,想复现一些更新的超分辨率网络时,往往只需要换一下生成器的结构,或者改一改误差的公式就行了。大部分的训练代码是不用改变的。

总结

SRGAN是把GAN运用在超分辨率任务上的开山之作。如正文所述,SRGAN中的部分设计虽然已经过时,但它的整体训练架构被一直沿用了下来。现在去回顾SRGAN这篇论文时,只需要关注以下几点即可:

  • 如何把GAN套用在超分辨率任务上
  • GAN误差
  • 感知误差

通过阅读这篇论文,我们不仅应该学会GAN是怎样运用在SR上的,也应该能总结出如何把GAN应用在其他任务上。GAN的本质是去学习一个分布,令生成的$G(z)$看上去是来自分布$p_g$,而不是像图像分类等任务去学习一个$x \to y$的映射关系。因此,GAN会记忆一些和数据集相关的信息。在输入信息就已经比较完备的图像分类、目标检测等任务中,GAN可能没有什么用武之地。但是,在输入信息不足的超分辨率、图像补全等任务中,GAN记忆的数据集信息有很有用了。很多时候,GAN会“脑补”出输入图像中不够清楚的部分。

决定了要在某个任务中使用GAN时,我们可以在一个不使用GAN的架构上做以下改动:

  • 定义一个分类网络$D$。
  • 在原loss中加一项由$D$算出来的GAN loss。
  • 在训练流程中,加入训练$D$的逻辑。

看完正文后,如果你对GAN在SR上的训练逻辑还是不太清楚,欢迎阅读附录中有关SRGAN训练代码的解读。

附录:MMEditing 中的 SRGAN

MMEditing中的SRGAN写在mmedit/models/restorers/srgan.py这个文件里。学习训练逻辑时,我们只需要关注SRGAN类的train_step方法即可。

以下是train_step的源代码(我的mmedit版本是v0.15.1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def train_step(self, data_batch, optimizer):
"""Train step.

Args:
data_batch (dict): A batch of data.
optimizer (obj): Optimizer.

Returns:
dict: Returned output.
"""
# data
lq = data_batch['lq']
gt = data_batch['gt']

# generator
fake_g_output = self.generator(lq)

losses = dict()
log_vars = dict()

# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

# discriminator
set_requires_grad(self.discriminator, True)
# real
real_d_pred = self.discriminator(gt)
loss_d_real = self.gan_loss(
real_d_pred, target_is_real=True, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_real=loss_d_real))
optimizer['discriminator'].zero_grad()
loss_d.backward()
log_vars.update(log_vars_d)
# fake
fake_d_pred = self.discriminator(fake_g_output.detach())
loss_d_fake = self.gan_loss(
fake_d_pred, target_is_real=False, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_fake=loss_d_fake))
loss_d.backward()
log_vars.update(log_vars_d)

optimizer['discriminator'].step()

self.step_counter += 1

log_vars.pop('loss') # remove the unnecessary 'loss'
outputs = dict(
log_vars=log_vars,
num_samples=len(gt.data),
results=dict(lq=lq.cpu(), gt=gt.cpu(), output=fake_g_output.cpu()))

return outputs

一开始,图像输出都在词典data_batch里。函数先把低清图lq和高清的真值gt从词典里取出。

1
2
3
# data
lq = data_batch['lq']
gt = data_batch['gt']

之后,函数计算了$G(I^{lq})$,为后续loss的计算做准备。

1
2
# generator
fake_g_output = self.generator(lq)

接下来,是优化生成器self.generator的逻辑。这里面有一些函数调用,我们可以不管它们的实现,大概理解整段代码的意思就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
losses = dict()
log_vars = dict()

# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

为了只训练生成器,要用下面的代码关闭判别器的训练。

1
2
# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

正文说过,训练GAN时一般要先训好判别器,且训练判别器多于训练生成器。因此,下面的if语句可以让判别器训练了self.disc_init_steps步后,每训练self.disc_steps步判别器再训练一步生成器。

1
2
if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):

if语句块里分别计算了逐像素误差(比如均方误差和L1误差)、感知误差、GAN误差。虽然SRGAN完全抛弃了逐像素误差,但实际训练时我们还是可以按一定比例加上这个误差。这些误差最后会用于训练生成器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

训练完生成器后,要训练判别器。和生成器的误差计算方法类似,判别器的训练代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 # discriminator
set_requires_grad(self.discriminator, True)
# real
real_d_pred = self.discriminator(gt)
loss_d_real = self.gan_loss(
real_d_pred, target_is_real=True, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_real=loss_d_real))
optimizer['discriminator'].zero_grad()
loss_d.backward()
log_vars.update(log_vars_d)
# fake
fake_d_pred = self.discriminator(fake_g_output.detach())
loss_d_fake = self.gan_loss(
fake_d_pred, target_is_real=False, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_fake=loss_d_fake))
loss_d.backward()
log_vars.update(log_vars_d)

optimizer['discriminator'].step()

这段代码有两个重点:

  1. 在训练判别器时,要用set_requires_grad(self.discriminator, True)开启判别器的梯度计算。
  2. fake_d_pred = self.discriminator(fake_g_output.detach())这一行的detach()很关键。detach()可以中断某张量的梯度跟踪。fake_g_output是由生成器算出来的,如果不把这个张量的梯度跟踪切断掉,在优化判别器时生成器的参数也会跟着优化。

函数的最后部分是一些和MMEditing其他代码逻辑的交互,和SRGAN本身没什么关联。

1
2
3
4
5
6
7
8
9
self.step_counter += 1

log_vars.pop('loss') # remove the unnecessary 'loss'
outputs = dict(
log_vars=log_vars,
num_samples=len(gt.data),
results=dict(lq=lq.cpu(), gt=gt.cpu(), output=fake_g_output.cpu()))

return outputs

只要理解了本文的误差计算公式,再看懂了这段代码是如何训练判别器和生成器的,就算是完全理解了SRGAN的核心思想了。

参考资料

[1] (SRGAN): Photo-Realistic Single Image Super-Resolution Using a Generative Adversarial Network

[2] (GAN): Generative Adversarial Nets

[3] (Perceptual Loss):Perceptual Losses for Real-Time Style Transfer and Super-Resolution

[4] (ESRGAN): ESRGAN: Enhanced Super-Resolution Generative Adversarial Networks

我之前的一篇文章介绍了如何给PyTorch添加CPU上的简单的加法算子。在这篇文章里,我将继续展示一个更具体的PyTorch自定义算子示例——自己动手复现二维卷积算子。这个示例是基于PyTorch Extension的,在迁移项目时,不需要自己生成动态库,只需要用setup.py重新编译一遍即可。我会同时介绍CPU版和CUDA版的实现。

许多前沿的神经网络都会对卷积进行一些修改。比如大名鼎鼎的可变形卷积(deformable convolution)。相信看完这篇文章后,大家能看懂PyTorch卷积的实现代码,并大概了解如何修改卷积的实现细节,并把新写好的卷积运用到自己的PyTorch项目中。

PyTorch Extension 实现二维卷积

搭建项目

在开始写代码前,要准备一个崭新的目录,在这个文件夹里搭建项目。

在根目录下,先创建一个setup.py,之后要填写这份安装文件。

之后,创建一个文件夹,其名字是项目名。在这个文件夹里合适的地方新建一个子文件夹,专门用来放和算子相关的文件。我的项目名叫做panoflow,算子相关文件放在了panoflow/core/op子文件夹下。

接下来,和算子实现相关的文件都应该放在算子文件夹里。使用和测试算子的文件可以放在项目文件夹的其他地方。

由于在实现中我借用了MMCV的代码,还要提前准备好一些头文件。首先新建一个文件pytorch_cpp_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef PYTORCH_CPP_HELPER
#define PYTORCH_CPP_HELPER
#include <torch/extension.h>

#include <vector>

using namespace at;

#define CHECK_CUDA(x) \
TORCH_CHECK(x.device().is_cuda(), #x " must be a CUDA tensor")
#define CHECK_CPU(x) \
TORCH_CHECK(!x.device().is_cuda(), #x " must be a CPU tensor")
#define CHECK_CONTIGUOUS(x) \
TORCH_CHECK(x.is_contiguous(), #x " must be contiguous")
#define CHECK_CUDA_INPUT(x) \
CHECK_CUDA(x); \
CHECK_CONTIGUOUS(x)
#define CHECK_CPU_INPUT(x) \
CHECK_CPU(x); \
CHECK_CONTIGUOUS(x)

#endif // PYTORCH_CPP_HELPER

再创建一个文件pytorch_cuda_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef PYTORCH_CUDA_HELPER
#define PYTORCH_CUDA_HELPER

#include <ATen/ATen.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>

#include <ATen/cuda/CUDAApplyUtils.cuh>
#include <THC/THCAtomics.cuh>

#include "common_cuda_helper.hpp"

using at::Half;
using at::Tensor;
using phalf = at::Half;

#define __PHALF(x) (x)

#endif // PYTORCH_CUDA_HELPER

还有一个common_cuda_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef COMMON_CUDA_HELPER
#define COMMON_CUDA_HELPER

#include <cuda.h>

#define CUDA_1D_KERNEL_LOOP(i, n) \
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < (n); \
i += blockDim.x * gridDim.x)

#define CUDA_2D_KERNEL_LOOP(i, n, j, m) \
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < (n); \
i += blockDim.x * gridDim.x) \
for (size_t j = blockIdx.y * blockDim.y + threadIdx.y; j < (m); \
j += blockDim.y * gridDim.y)

#define CUDA_2D_KERNEL_BLOCK_LOOP(i, n, j, m) \
for (size_t i = blockIdx.x; i < (n); i += gridDim.x) \
for (size_t j = blockIdx.y; j < (m); j += gridDim.y)

#define THREADS_PER_BLOCK 512

inline int GET_BLOCKS(const int N, const int num_threads = THREADS_PER_BLOCK) {
int optimal_block_num = (N + num_threads - 1) / num_threads;
int max_block_num = 4096;
return min(optimal_block_num, max_block_num);
}

template <typename T>
__device__ T bilinear_interpolate(const T* input, const int height,
const int width, T y, T x,
const int index /* index for debug only*/) {
// deal with cases that inverse elements are out of feature map boundary
if (y < -1.0 || y > height || x < -1.0 || x > width) return 0;

if (y <= 0) y = 0;
if (x <= 0) x = 0;

int y_low = (int)y;
int x_low = (int)x;
int y_high;
int x_high;

if (y_low >= height - 1) {
y_high = y_low = height - 1;
y = (T)y_low;
} else {
y_high = y_low + 1;
}

if (x_low >= width - 1) {
x_high = x_low = width - 1;
x = (T)x_low;
} else {
x_high = x_low + 1;
}

T ly = y - y_low;
T lx = x - x_low;
T hy = 1. - ly, hx = 1. - lx;
// do bilinear interpolation
T v1 = input[y_low * width + x_low];
T v2 = input[y_low * width + x_high];
T v3 = input[y_high * width + x_low];
T v4 = input[y_high * width + x_high];
T w1 = hy * hx, w2 = hy * lx, w3 = ly * hx, w4 = ly * lx;

T val = (w1 * v1 + w2 * v2 + w3 * v3 + w4 * v4);

return val;
}

template <typename T>
__device__ void bilinear_interpolate_gradient(
const int height, const int width, T y, T x, T& w1, T& w2, T& w3, T& w4,
int& x_low, int& x_high, int& y_low, int& y_high,
const int index /* index for debug only*/) {
// deal with cases that inverse elements are out of feature map boundary
if (y < -1.0 || y > height || x < -1.0 || x > width) {
// empty
w1 = w2 = w3 = w4 = 0.;
x_low = x_high = y_low = y_high = -1;
return;
}

if (y <= 0) y = 0;
if (x <= 0) x = 0;

y_low = (int)y;
x_low = (int)x;

if (y_low >= height - 1) {
y_high = y_low = height - 1;
y = (T)y_low;
} else {
y_high = y_low + 1;
}

if (x_low >= width - 1) {
x_high = x_low = width - 1;
x = (T)x_low;
} else {
x_high = x_low + 1;
}

T ly = y - y_low;
T lx = x - x_low;
T hy = 1. - ly, hx = 1. - lx;

// reference in forward
// T v1 = input[y_low * width + x_low];
// T v2 = input[y_low * width + x_high];
// T v3 = input[y_high * width + x_low];
// T v4 = input[y_high * width + x_high];
// T val = (w1 * v1 + w2 * v2 + w3 * v3 + w4 * v4);

w1 = hy * hx, w2 = hy * lx, w3 = ly * hx, w4 = ly * lx;

return;
}
#endif // COMMON_CUDA_HELPER

这些文件添加了CPU和CUDA实现时需要的头文件和定义,后面的C++源码会用到它们。

CPU

C++实现

在用C++实现一个算子时,我们要编写一个形如这样的文件:

1
2
3
4
5
6
7
8
9
10
11
#include <torch/torch.h>

torch::Tensor my_add(torch::Tensor t1, torch::Tensor t2)
{
return t1 + t2;
}

TORCH_LIBRARY(my_ops, m)
{
m.def("my_add", my_add);
}

这个C++文件主要包含两部分内容:算子的实现函数和C++接口绑定。在实现卷积时,也是要实现这两部分内容。

在修改一个现有的算子时,最好的方法不是从头写一个,而是去开源库里找一份实现,并在这个基础上进行修改。

我在MMCV的仓库里找到了可变形卷积的实现,并把它拆解回了普通的卷积。我参考了这篇教程:手把手教你如何高效地在 MMCV 中贡献算子。另外,这份笔记还参考了PyTorch官方Extension教程

找到了卷积的实现后,在算子文件夹下新建一个cpp源文件。比如我的文件路径就是panoflow/core/op/my_conv.cpp。这样一个普通卷积的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#include "pytorch_cpp_helper.hpp"

void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col);

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col);

void my_conv_shape_check(at::Tensor input,
at::Tensor weight, int kH,
int kW, int dH, int dW, int padH, int padW,
int dilationH, int dilationW, int group)
{
TORCH_CHECK(
weight.ndimension() == 4,
"4D weight tensor (nOutputPlane,nInputPlane,kH,kW) expected, but got: %s",
weight.ndimension());

TORCH_CHECK(weight.is_contiguous(), "weight tensor has to be contiguous");

TORCH_CHECK(kW > 0 && kH > 0,
"kernel size should be greater than zero, but got kH: %d kW: %d",
kH, kW);

TORCH_CHECK((weight.size(2) == kH && weight.size(3) == kW),
"kernel size should be consistent with weight, ",
"but got kH: %d kW: %d weight.size(2): %d, weight.size(3): %d",
kH, kW, weight.size(2), weight.size(3));

TORCH_CHECK(dW > 0 && dH > 0,
"stride should be greater than zero, but got dH: %d dW: %d", dH,
dW);

TORCH_CHECK(
dilationW > 0 && dilationH > 0,
"dilation should be greater than 0, but got dilationH: %d dilationW: %d",
dilationH, dilationW);

int ndim = input.ndimension();
int dimf = 0;
int dimh = 1;
int dimw = 2;

if (ndim == 4)
{
dimf++;
dimh++;
dimw++;
}

TORCH_CHECK(ndim == 3 || ndim == 4,
"3D or 4D input tensor expected but got: %s", ndim);

long nInputPlane = weight.size(1) * group;
long inputHeight = input.size(dimh);
long inputWidth = input.size(dimw);
long nOutputPlane = weight.size(0);
long outputHeight =
(inputHeight + 2 * padH - (dilationH * (kH - 1) + 1)) / dH + 1;
long outputWidth =
(inputWidth + 2 * padW - (dilationW * (kW - 1) + 1)) / dW + 1;

if (outputWidth < 1 || outputHeight < 1)
AT_ERROR(
"Given input size: (%ld x %ld x %ld). "
"Calculated output size: (%ld x %ld x %ld). Output size is too small",
nInputPlane, inputHeight, inputWidth, nOutputPlane, outputHeight,
outputWidth);

TORCH_CHECK(input.size(1) == nInputPlane,
"invalid number of input planes, expected: %d, but got: %d",
nInputPlane, input.size(1));

TORCH_CHECK((inputHeight >= kH && inputWidth >= kW),
"input image is smaller than kernel");
}

void my_conv_forward(Tensor input, Tensor weight, Tensor bias,
Tensor output, Tensor columns, int kW,
int kH, int dW, int dH, int padW, int padH,
int dilationW, int dilationH, int group,
int im2col_step)
{
bool isCuda = false;
if (input.device().is_cuda())
{
CHECK_CUDA_INPUT(input);
CHECK_CUDA_INPUT(weight);
CHECK_CUDA_INPUT(bias);
CHECK_CUDA_INPUT(output);
CHECK_CUDA_INPUT(columns);
isCuda = true;
}
else
{
CHECK_CPU_INPUT(input);
CHECK_CPU_INPUT(weight);
CHECK_CPU_INPUT(bias);
CHECK_CPU_INPUT(output);
CHECK_CPU_INPUT(columns);
}

my_conv_shape_check(input, weight, kH, kW, dH, dW, padH,
padW, dilationH, dilationW, group);
at::DeviceGuard guard(input.device());

int batch = 1;
if (input.ndimension() == 3)
{
// Force batch
batch = 0;
input.unsqueeze_(0);
}

long batchSize = input.size(0);
long nInputPlane = input.size(1);
long inputHeight = input.size(2);
long inputWidth = input.size(3);

long nOutputPlane = weight.size(0);

long outputWidth =
(inputWidth + 2 * padW - (dilationW * (kW - 1) + 1)) / dW + 1;
long outputHeight =
(inputHeight + 2 * padH - (dilationH * (kH - 1) + 1)) / dH + 1;

output = output.view({batchSize / im2col_step, im2col_step, nOutputPlane,
outputHeight, outputWidth});
columns = at::zeros(
{nInputPlane * kW * kH, im2col_step * outputHeight * outputWidth},
input.options());

input = input.view({batchSize / im2col_step, im2col_step, nInputPlane,
inputHeight, inputWidth});

Tensor output_buffer = at::zeros({batchSize / im2col_step, nOutputPlane,
im2col_step * outputHeight, outputWidth},
output.options());

output_buffer = output_buffer.view(
{output_buffer.size(0), group, output_buffer.size(1) / group,
output_buffer.size(2), output_buffer.size(3)});

for (int elt = 0; elt < batchSize / im2col_step; elt++)
{
if (isCuda)
{
my_conv_im2col_cuda(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}
else
{
my_conv_im2col_cpu(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}


columns = columns.view({group, columns.size(0) / group, columns.size(1)});
weight = weight.view({group, weight.size(0) / group, weight.size(1),
weight.size(2), weight.size(3)});

for (int g = 0; g < group; g++)
{
output_buffer[elt][g] = output_buffer[elt][g]
.flatten(1)
.addmm_(weight[g].flatten(1), columns[g])
.view_as(output_buffer[elt][g]);
}
columns =
columns.view({columns.size(0) * columns.size(1), columns.size(2)});
weight = weight.view({weight.size(0) * weight.size(1), weight.size(2),
weight.size(3), weight.size(4)});
}

output_buffer = output_buffer.view(
{output_buffer.size(0), output_buffer.size(1) * output_buffer.size(2),
output_buffer.size(3), output_buffer.size(4)});

output_buffer = output_buffer.view({batchSize / im2col_step, nOutputPlane,
im2col_step, outputHeight, outputWidth});
output_buffer.transpose_(1, 2);

output.copy_(output_buffer);
output = output.view({batchSize, nOutputPlane, outputHeight, outputWidth});

bias = bias.view({1, bias.size(0), 1, 1});
output.add_(bias);

input = input.view({batchSize, nInputPlane, inputHeight, inputWidth});

if (batch == 0)
{
output = output.view({nOutputPlane, outputHeight, outputWidth});
input = input.view({nInputPlane, inputHeight, inputWidth});
}
}

template <typename T>
void my_conv_im2col_cpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
for (int index = 0; index < n; index++)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "", [&]
{ my_conv_im2col_cpu_kernel<scalar_t>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });
}

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{

}

PYBIND11_MODULE(my_ops, m)
{
m.def("my_conv_forward", my_conv_forward, "my_conv_forward",
py::arg("input"), py::arg("weight"), py::arg("bias"),
py::arg("output"), py::arg("columns"), py::arg("kW"),
py::arg("kH"), py::arg("dW"), py::arg("dH"), py::arg("padW"),
py::arg("padH"), py::arg("dilationW"), py::arg("dilationH"),
py::arg("group"), py::arg("im2col_step"));
}

这份实现非常长,我挑一些重点的内容讲解。

从最下面的PYBIND11_MODULE(my_ops, m)看起。这里的my_ops是生成的库名,可以随便取名。待会要import这个库名。代码块里m.def用于定义C++函数的Python接口。"my_conv_forward"是Python调用时的函数名称,my_conv_forward是被Python代码调用的这份代码里的C++函数名称。也就是说,这份卷积实现的入口函数就是my_conv_forward。我们从这个函数看起。

1
2
3
4
5
void my_conv_forward(Tensor input, Tensor weight, Tensor bias,
Tensor output, Tensor columns, int kW,
int kH, int dW, int dH, int padW, int padH,
int dilationW, int dilationH, int group,
int im2col_step)

my_conv_forward就是卷积的主函数。它的参数除了PyTorch的Conv2d传入的参数外,还多了两个参数output, columus。这两个张量是保存中间结果的,在PyTorch侧是看不到的。output用于保存卷积输出,columns用于保存卷积时的列矩阵。底层实现卷积时,会先把图像转换成一个用列表示的矩阵,再把卷积操作当成一个矩阵乘法来完成。其中,第一步操作叫做”im2col”。对此原理不熟的话可以参考这篇文章:https://zhuanlan.zhihu.com/p/63974249。

my_conv_forward函数的大部分内容都是在做类型检查和张量形状转换。在修改卷积实现时,这些东西都可以不用改。整个卷积操作的核心都在这一部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (int elt = 0; elt < batchSize / im2col_step; elt++)
{
if (isCuda)
{
my_conv_im2col_cuda(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}
else
{
my_conv_im2col_cpu(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}


columns = columns.view({group, columns.size(0) / group, columns.size(1)});
weight = weight.view({group, weight.size(0) / group, weight.size(1),
weight.size(2), weight.size(3)});

for (int g = 0; g < group; g++)
{
output_buffer[elt][g] = output_buffer[elt][g]
.flatten(1)
.addmm_(weight[g].flatten(1), columns[g])
.view_as(output_buffer[elt][g]);
}
columns =
columns.view({columns.size(0) * columns.size(1), columns.size(2)});
weight = weight.view({weight.size(0) * weight.size(1), weight.size(2),
weight.size(3), weight.size(4)});
}

这段代码先做了im2col操作,再做了矩阵乘法。其实,包括可变形卷积在内,各种稀奇古怪的卷积操作通过靠修改im2col来完成的。CPU和CUDA版卷积的主要区别,也体现在im2col中(后面的矩阵乘法在CPU和CUDA上都能用)。

由于是讲CPU实现,这里的CUDA实现我暂时放了一个空函数。my_conv_im2col_cpu的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "", [&]
{ my_conv_im2col_cpu_kernel<scalar_t>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });
}

这个函数其实只是处理了一下输入,真正的实现在my_conv_im2col_cpu_kernel里。AT_DISPATCH_FLOATING_TYPES_AND_HALF可以让实现兼容半精度和普通float,所以实现my_conv_im2col_cpu_kernel得写成一个模板函数。

my_conv_im2col_cpu_kernel的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template <typename T>
void my_conv_im2col_cpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
for (int index = 0; index < n; index++)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

它的作用就是把图像里的数据搬到做卷积运算的column里。循环遍历每一次卷积的每一个位置,把待运算的量填入column。卷积里的所有参数(pad, stride, …)都是在这段函数里生效的。想实现可变形卷积等改进,也要修改这个函数。

Python封装

实现好了后,如果编译完了的话,刚刚的卷积接口可以通过以下方式在Python里调用:

1
2
import my_ops
my_ops.my_conv_forward(...)

这里的my_ops这个名称必须和开始PYBIND11_MODULE(my_ops, m)里面那个库名称对应。

基于这个接口,可以仿照PyTorch中Conv2d的接口,编写一个和Conv2d等价的torch.nn.Module出来。我的这个Python文件的路径是panoflow/core/op/my_conv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import torch
from torch.autograd import Function
import torch.nn as nn
from torch import Tensor
from torch.nn.modules.utils import _pair
from torch.nn.parameter import Parameter

import my_ops


class MyConvF(Function):

@staticmethod
def forward(ctx,
input: torch.Tensor,
weight,
bias,
stride=1,
padding=0,
dilation=1,
groups=1,
im2col_step=32):
if input is not None and input.dim() != 4:
raise ValueError(
f'Expected 4D tensor as input, got {input.dim()}D tensor \
instead.')
ctx.stride = _pair(stride)
ctx.padding = _pair(padding)
ctx.dilation = _pair(dilation)
ctx.groups = groups
ctx.im2col_step = im2col_step

weight = weight.type_as(input)
ctx.save_for_backward(input, weight)

output = input.new_empty(MyConvF._output_size(ctx, input, weight))

ctx.bufs_ = [input.new_empty(0), input.new_empty(0)] # columns, ones

cur_im2col_step = min(ctx.im2col_step, input.size(0))
assert (input.size(0) % cur_im2col_step
) == 0, 'batch size must be divisible by im2col_step'

my_ops.my_conv_forward(
input,
weight,
bias,
output,
ctx.bufs_[0],
kW=weight.size(3),
kH=weight.size(2),
dW=ctx.stride[1],
dH=ctx.stride[0],
padW=ctx.padding[1],
padH=ctx.padding[0],
dilationW=ctx.dilation[1],
dilationH=ctx.dilation[0],
group=ctx.groups,
im2col_step=cur_im2col_step)
return output

@staticmethod
def _output_size(ctx, input, weight):
channels = weight.size(0)
output_size = (input.size(0), channels)
for d in range(input.dim() - 2):
in_size = input.size(d + 2)
pad = ctx.padding[d]
kernel = ctx.dilation[d] * (weight.size(d + 2) - 1) + 1
stride_ = ctx.stride[d]
output_size += ((in_size + (2 * pad) - kernel) // stride_ + 1, )
if not all(map(lambda s: s > 0, output_size)):
raise ValueError(
'convolution input is too small (output would be ' +
'x'.join(map(str, output_size)) + ')')
return output_size


my_conv = MyConvF.apply


class MyConv2d(nn.Module):

def __init__(self,
in_channels: int,
out_channels: int,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups: int = 1,
bias: bool = True):
super().__init__()
kernel_size_ = _pair(kernel_size)
stride_ = _pair(stride)
padding_ = _pair(padding)
dilation_ = _pair(dilation)
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size_
self.stride = stride_
self.padding = padding_
self.dilation = dilation_
self.groups = groups
self.weight = Parameter(
torch.Tensor(out_channels, in_channels // groups, *kernel_size_))
self.bias = Parameter(torch.Tensor(out_channels))

# Useless attributes
self.transposed = None
self.output_padding = None
self.padding_mode = None

def forward(self, input: Tensor) -> Tensor:
return my_conv(input, self.weight, self.bias, self.stride,
self.padding, self.dilation, self.groups)

以后,用自己的卷积MyConv2d就和用普通的Conv2d一样了。

编译

打开外面的setup.py,填写以下内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from setuptools import setup
from torch.utils import cpp_extension
import os

src_root = 'panoflow/core/op'
cpp_src = ['my_conv.cpp']

if __name__ == '__main__':
include_dirs = ['panoflow/core/op']
cpp_path = [os.path.join(src_root, src) for src in cpp_src]

setup(
name='panoflow',
ext_modules=[
cpp_extension.CppExtension(
'my_ops', cpp_path, include_dirs=include_dirs)
],
cmdclass={'build_ext': cpp_extension.BuildExtension})

其中的路径要根据自己的实际情况修改。

和编译相关的内容都写在cpp_extension.CppExtension里。其中,源文件要写在第二个参数里,头文件目录要写在include_dirs。由于我的源文件放在panoflow/core/op里,我写了个源文件名数组cpp_src,在传参前把路径组合了一下。由于include_dirs和源文件在同一个目录下,我也填的是panoflow/core/op

写完了setup.py后,运行python setup.py develop,就能一键编译和安装。如果运行后没有报编译错误,就可以把实现的卷积用起来了。

单元测试

用单元测试可以快速地验证卷积是否实现成功。我写了一个简单的单元测试文件,在任意一个文件夹下创建该文件即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import torch
import torch.nn as nn
from panoflow.core.op.my_conv import MyConv2d

inc = 3
outc = 4
img_shaspe = (50, 50)

# device_name = 'cuda:0'
device_name = 'cpu'
open_bias = True


def test_one():
ts = torch.ones([1, 1, 3, 3]).to(device_name)
layer = nn.Conv2d(1, 1, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(1, 1, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


def test_two():
ts = torch.rand([1, inc, *img_shaspe]).to(device_name)
layer = nn.Conv2d(inc, outc, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(inc, outc, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


if __name__ == '__main__':
test_one()
test_two()

其中,panoflow.core.op.my_conv是我刚刚放MyConv2d的Python模块。

直接运行这个Python文件,如果没有任何输出(报错信息),就说明卷积实现成功了。

CUDA

C++实现

在刚刚的实现中,有一个my_conv_im2col_cuda的实现是空着的。在CUDA版本中,我们要实现这个函数。不过,这个函数要放在一个用nvcc编译的.cu文件里。注意!注意!注意! 因此,my_conv.cpp里那个空的my_conv_im2col_cuda实现应该全部删掉。

新建一个文件my_conv_cuda.cu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Modify from https://github.com/open-mmlab/mmcv/blob/my_conv/mmcv/ops/csrc/common/cuda/deform_conv_cuda_kernel.cuh
// Copyright (c) OpenMMLab. All rights reserved.
#include <torch/types.h>

#include "pytorch_cuda_helper.hpp"

template <typename T>
__global__ void my_conv_im2col_gpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
CUDA_1D_KERNEL_LOOP(index, n)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "my_conv_im2col_gpu", [&]
{ my_conv_im2col_gpu_kernel<scalar_t><<<GET_BLOCKS(num_kernels),
THREADS_PER_BLOCK, 0,
at::cuda::getCurrentCUDAStream()>>>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });

AT_CUDA_CHECK(cudaGetLastError());
}

和CPU版的类似,my_conv_im2col_cuda也是预处理了输入,并调用核函数my_conv_im2col_gpu_kernel来实现im2col

CUDA实现和CPU几乎一样,唯一的区别就是for循环变成了CUDA_1D_KERNEL_LOOP(index, n)。这个宏是头文件里帮我们定义的,它简化了CUDA的一维循环。

编译

修改setup.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from setuptools import setup
from torch.utils import cpp_extension
import os

src_root = 'panoflow/core/op'
cpp_src = ['my_conv.cpp', 'my_conv_cuda.cu']

if __name__ == '__main__':
include_dirs = ['panoflow/core/op']
cpp_path = [os.path.join(src_root, src) for src in cpp_src]

setup(
name='panoflow',
ext_modules=[
cpp_extension.CUDAExtension(
'my_ops', cpp_path, include_dirs=include_dirs)
],
cmdclass={'build_ext': cpp_extension.BuildExtension})

首先,要把源文件加入cpp_src里。之后,把CppExtension改成CUDAExtension。这样,就能编译新写的CUDA文件了。

写完了之后,再次python setup.py develop编译即可。

编译小技巧:不拿IDE直接写C++和CUDA源代码是很容易出错误的。但如果你想只用setup.py来验证代码的正确性,可以python setup.py develop > tmp.txt把编译输出重定向到一个文件里来查看。由于编译时的信息过多,在命令行里很难从一堆编译warning里找到最重要的error。

测试

由于Python部分在之前都已经写好了,可以直接用刚刚的单元测试文件测试了。只要把刚刚那份文件的device_name改成cuda:0即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch
import torch.nn as nn
from panoflow.core.op.my_conv import MyConv2d

inc = 3
outc = 4
img_shaspe = (50, 50)

device_name = 'cuda:0'
# device_name = 'cpu'
open_bias = True


def test_one():
ts = torch.ones([1, 1, 3, 3]).to(device_name)
layer = nn.Conv2d(1, 1, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(1, 1, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


def test_two():
ts = torch.rand([1, inc, *img_shaspe]).to(device_name)
layer = nn.Conv2d(inc, outc, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(inc, outc, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


if __name__ == '__main__':
test_one()
test_two()

同样,没报错就说明写对了。

ResNet是CV中的经典网络。在这篇文章中,我将按照阅读论文的通用方法由粗至精地解读这篇文章。如果你对ResNet不熟,最好对着原论文阅读本文。如果你已经很熟悉ResNet了,也可以通过这篇文章查缺补漏。

粗读

摘要

摘要的前三句话开门见山地介绍本文要解决的问题以及解决的方法。

  • 问题:较深的神经网络很难训练。
  • 本文的工作:提出了一种能够轻松训练比以往的网络要深得多的残差学习框架。
  • 本文方法的进一步解释:神经网络的层将拟合一个基于输入的残差函数,而不是一个没有参考的函数。

随后,摘要展示了这种方法取得的成就:

  • 经实验而非理论证明,增加深度后,模型的训练效果和测试效果都得到了提升。
  • 精度打败了当时所有的分类模型。
  • 深度高达152,大幅超过了当时较火的19层的VGG,同时并没有增加多少计算量。
  • 精度打败了当时所有的检测、分割等任务的模型。这证明这种方法的泛化性强。

这篇摘要没有罗列贡献,而是在提出问题后轻轻一点,介绍了本文的方法及其作用。随后,所有的篇幅都在秀这种方法的成效。看完这段话,我们可能还不知道“残差”是怎么算的,但我们知道了这种方法很厉害,测试精度、训练难易度、泛化性都十分优秀,成功解决了较深的模型难以训练的问题。

在这轮粗读中,我们可以带着以下问题去概览论文:

  • 文章要解决的具体是一个怎样的问题?为什么较深的神经网络很难训练?
  • 文章提出的“残差”是什么?它为什么能解决深模型难训练的问题?
  • 凭什么说深模型难训练的问题被解决了?实验是怎么做的?
  • 这篇文章提出的模型比其他模型好在哪?

引言

引言用十分清晰的逻辑阐明了本文的核心思想——深度残差学习框架。引言先介绍“训练更深的神经网络”这一问题的由来,再抽丝剥茧地讨论该问题的解决方法,最后自然过渡到了本文的核心思想上。看完引言,我们就应该看懂这篇文章。

深度卷积神经网络的有效性,得益于它的“深度”。那么,是不是层数更深的网络就更好呢?过去的实验显示:不是的。

更深的网络面临的第一个问题是梯度弥散/爆炸,这些问题会令网络难以收敛。但是,通过参数归一初始化和归一化层,这一问题以及得到了有效缓解。证据就是,较深的网络能够收敛。

这时,较深网络又暴露出了第二个问题——退化:增加网络的深度,反而会降低网络的精度。这种退化和过拟合还不同,因为退化不仅导致精度降低,还提升了模型的训练误差。

这种退化,是不是说明较深的网络本质上就比不过较浅的网络呢?其实并不是,退化只是因为较深的网络不是那么容易优化。考虑一个较浅的网络,我们往它的后面增加几层全等映射(y=x)。这个变深的网络与原来的网络完全等价。从这个角度看,更深的网络最少不会比更浅的网络更差才对。因此,我们要想办法让学习算法能够更好地优化更深的网络,最起码优化出一个不比较浅网络更差的网络。也就是说,我们要想办法让学习算法能够轻松学会恒等映射。

文章提出了一种深度残差学习框架。假设一层网络表示的映射是$H(x)$,则该层的非线性层要拟合另一个表示残差的映射$F(x) := H(x) - x$。换个角度看,就是一层网络的输出由原来的$F(x)$变成了$F(x)+x$,多加上了一个$x$。这样,只要令$F(x)=0$,网络就能轻松描述恒等映射,较深的网络最起码不比较浅的网络更差了。

多个数据集上的实验表明,这种方法不仅容易训练,还能得到更高的精度。在多项任务的CV竞赛中,这种方法都独占鳌头。

看完了引言,我们基本能知道文章在解决怎样的问题,也大致明白了残差学习的原理。接下来,我们来过一过这篇文章的实验,看看这种方法的效果究竟如何。

实验

我们主要看几组ImageNet上的实验。为了方便称呼,作者把不使用残差连接的普通网络叫做“平坦(plain)”网络。第一组实验对比了不同深度的平坦网络的训练误差(细线)和验证误差(粗线)。

可以看出,更深的网络有更高的训练误差和验证误差。作者排除了梯度问题的影响。这些网络在训练时使用了Batch Normalization (BN),经调试,它们的梯度值也一切正常。因此,这种问题不是梯度消失导致的。引言中对于退化问题的描述得到了验证。

第二组实验是对残差网络ResNet的实验。我们可以把它的结果和平坦网络的放在一起对比。

从图中可以看出,使用残差学习后,更深的网络果然有了更低的训练误差和验证误差。同时,从表中可以看出,残差网络的测试误差也降低了一大截。这说明残差学习很好地解决了前文提到的退化问题,且这种有效性在测试数据上依然能保持。

由于这轮粗读我们没读方法部分,和方法有关的实验结果得跳过。可以直接翻页到和其他模型的对比表格处:

ResNet 在各个评价指标上打败了当时的其他网络,确实很厉害。

后面的表格显示,不仅是图像分类,ResNet在检测和分割等任务中也取得了第一名。

一般看到这里,一轮粗读就差不多完成了。从这轮粗读中,我们看懂了残差学习的核心思想,基本上理解了本文的核心创新点。当然,粗读深度学习相关的论文时,还可以扫一眼网络的核心模块和模型结构。精读的时候,我们再仔细理解文章的方法。

精读

残差公式

设多个层构成的模块在拟合一个映射$H(x)$,其中第一层的输入是$x$,则我们希望网络的非线性层去拟合另一个残差函数$F(x) := H(x) - x$,或者说整个模块在拟合$F(x) + x$。由于神经网络的多个非线性层能拟合任意复杂的映射,拟合一个残差函数$H(x)-x$也是可以实现的。

残差块

有了整体的思路,我们来具体看看每一个带残差的模块是怎么搭建的。具体而言,一个模块可以由输入$x$和参数集合${W_i}$计算得到(为了简化表达,bias被省略了):

我们主要关心$F(x, {W_i})$是怎么搭建的。文章中给出了一种简单的双层残差块示例,其中,$x$是以短路的形式连接到输出上的:

注意这里的$y = W_2\sigma(W_1x)$是一个未经激活的结果。整个模块送入下一层的输出是$\sigma(y)$,还要加上激活函数。

残差学习是一个框架,每个残差块可以有更多层。比如本文在实验部分还测试了另一种三层残差块(下图是一个示例,实际上通道数可以任意改变)。

多层的残差块都是可行的。但是,单层的残差块$y = W_1x + x$和线性层几乎一样,不能提升网络的有效性。

上述的输入$x$是直接加到激活前的输出上的。这个加法操作有一个前提:模块的输入输出通道数是一样的。在输入输出通道数不同时,可以在短路连接上加一个变换维度的线性运算$W_sx$。原来直接做加法的操作叫做全等映射,这里加上一个线性运算再做加法的操作叫做投影映射。

这里的符号标记都是基于全连接层的。这些计算对卷积层来说也一样,把矩阵乘法替换成卷积即可。

网络架构

本文先借鉴VGG的思路,先搭建了一个图像大小逐渐减半,深度逐渐翻倍的平坦网络。之后,在平坦网络连续的3x3卷积层上添加残差连接。下图中的实线代表普通的短路连接,虚线表示需要变换张量形状的短路连接。

在虚线表示的残差连接中,图像的大小和深度都发生了改变。对于深度的增加,即可以使用上一节提到的投影映射,也可以直接往多出来的通道数里填0(全等映射)。对于大小的减半,无论是用怎样的深度变化方法,都可以使用步幅为2的操作来实现大小减半。

在投影时,要进行卷积操作,卷积的步幅为2很好理解。但是,文章没有详细介绍步幅为2的全等映射是怎么做的。直观上理解,就是以步幅2跳着去输入张量里取值。

文章还提出了层数不同的ResNet架构。对于较深的网络,文章使用了3层残差块。

训练细节

训练的大部分配置都借鉴自AlexNet。如果你是刚入门图像识别且以后要从事这方面的研究,可以多关注这些细节。

数据处理

  • 随机缩放图像短边至 [256, 480] 中的均匀随机值
  • 随机翻转
  • 裁剪成 224x224
  • 像素归一化(同AlexNet)
  • 标准颜色增强(同AlexNet)

归一化

  • 激活前使用BN
  • 参数初始化使用 He Initialization

优化配置

  • batch size 256 的 mini-batch 梯度下降
  • 学习率初始化为0.1,发现误差不动了就除以10
  • 迭代$60 \times 10^4$轮
  • weight decay=0.0001, momentum=0.9
  • 没有dropout

此外,为了提高比赛中的精度,在测试时使用了10-crop(对图像裁剪10次,把输入分别输入网络,取结果的平均值)。同时,图像以不同的尺寸输入进了网络,结果取所有运算的平均值。

实验

读懂了方法,我们再来详细读一遍实验部分。

再看一次平坦网络和残差网络的对比。

在这份对比实验中,残差网络相对平坦网络没有添加任何参数。当通道数改变时,残差网络用的是0填充的全等映射。这样,平坦网络和残差网络的唯一区别就是是否有短路连接,整个对比实验非常公平。实验的结果证明了残差连接的有效性。

这篇工作还做了另一个比较平坦网络的实验:在CIFAR-10数据集上统计了平坦网络和残差网络的3x3卷积输出的标准差。标准差大,能说明输出的数值较大。

从这份对比结果中,我们能看出残差网络的输出标准差小于平坦网络。这符合残差学习的设计初衷:残差块至少是一个不会使性能变差的全等映射,其效果能够在全等映射的基础上优化。也因此,残差网络的输出大小会比平坦网络更靠近0。

看完了和平坦网络的对比,再看一看不同配置的ResNet直接的对比。文章先比较了全等映射和投影映射的短路连接。本文探讨了短路连接的3种配置:A) 全部使用全等连接 B) 有通道数变动的地方才使用投影连接 C) 全部使用投影连接。它们的表现如下:

可以看出,投影用得越多,效果越好。作者认为,B相对A更好,是因为通道数变化时0填充的部分没有残差学习(也就是没有做$+x$的操作);C相对B更好,是因为参数更多了。这些实验结果说明,投影连接并不能解决退化问题,出于性能的考虑,本文没有在其他实验中使用C。

后来,B是公认的ResNet标配。

文章还讨论了构筑更深网络的bottleneck结构。如前文所述,50层以上的ResNet在每个残差块里使用了3个1x1, 3x3, 1x3的卷积。这种设计主要是为了缩短训练时间。ResNet-50和ResNet-34有着同样的时间复杂度,却因为深度更大,精度更高。

这篇论文的主要实验就是这些。论文后面还展示了一个比较有趣的实验:在CIFAR-10数据集上训练超过1000层的ResNet。实验结果显示,1000多层的ResNet仍能成功被优化,训练误差也降到了很低,但是测试误差没有110层的网络好。作者认为,这是因为训练数据太少,网络过拟合了。

总结

ResNet是基于深度学习的计算机视觉中里程碑式的工作。通过阅读这篇论文,我们发现,ResNet使用的残差结构本身并不十分复杂。这篇工作真正出彩之处,是发现了深度神经网络的一个普遍存在的问题,并用精彩的推理设计出了一套能够解决此问题的方案。这种方案确实很有效,基于残差学习的ResNet击败了同时代的所有网络。残差连接被用在了后续几乎所有网络架构中,使用ResNet为backbone也成了其他CV任务较为常用的初始配置。

在这堂课里,我们要学习两个具体的应用:人脸识别、风格迁移。

相信大家已经在很多地方见识过人脸识别应用了:在火车站,要通过身份证和人脸核实身份;办理业务时,可以用手机完成人脸识别以核实身份;进办公楼时,员工只要刷脸通过就可以打开门禁。通过这节课的学习,我们能够学会如何用CNN完成人脸识别。

神经网络风格迁移是一项有趣的应用。它利用了CNN捕捉图像抽象信息的特点,能够把一幅图像的风格转移到另一幅图像上,从而生成一幅新的图像。这项技术不需要从头训练网络,学完这门课后,我们能快速地用代码实现神经网络风格迁移。

课堂笔记

人脸识别

准确来说,在人脸识别(Face Recognition)任务中,会给定一个有$K$个人的数据库。之后,每一次识别都会输入一张图片,输出这张图片是$K$个人中的哪一个,或者没有检测到相关人士。

有一个与这个相关的任务叫做人脸验证(Face Verification)。这个任务稍微简单一些,输入是一张图片和一个标记身份的数据(比如身份证号),要求输出图片中的人是否符合该身份。

单样本学习

按我们之前学的方法,假如我们在$K$个人的数据库上做识别(分类)任务,应该套用一个CNN模型,并在模型最后接一个$K+1$类的softmax,表示输入图片是K个人中的哪一个,或者都不是。

但是,这样的架构不适合人脸识别任务。以公司的门禁识别为例,这种方法有如下的缺点:

  1. 每来一个新同事,模型就要重新训练一次。
  2. 每个人都得上传大量的个人照片供网络训练。

理想情况下,我们希望模型能从一张人脸照片中学会分辨这张人脸,这样每个新同事只需要上传一张照片即可。这叫做单样本学习(One-shot Learning)。

为了完成单样本学习,我们可以从另一个角度来建模人脸识别问题:如果输入的人脸和数据库里某张人脸极为相似,我们就说识别出了这张人脸;否则,就说没有识别到有效的人脸。

这样,人脸识别问题就被转换为了一个求两张图片相似度的问题。我们可以让网络学习一个输入是两张图片,输出是二者相似度的一个映射。

孪生网络

在完成和相似度有关的问题时,一种常见的做法是使用孪生网络(Siamese Network)。

假设网络的倒数第二层有128个神经元。在普通分类网络中,这128个神经元输出的长度为128的向量会被输入进最后的softmax层。而在孪生网络中,我们要去掉softmax层,并用这个没有sofrmax的网络$f$分别输出两张图片$x^{(1)}, x^{(2)}$对应的128维向量$f(x^{(1)}), f(x^{(2)})$。这样,每张图片有了唯一对应的一个128维向量,这个向量可以看成是该图片的编码(encoding)。而我们又知道,对向量求相似度是很方便的。我们可以利用两张图片的编码求出相似度。

说起向量的相似度,最容易想到是向量间的距离$||v - u||^2$。因此,我们可以i设法让网络学会这样一种关系:

  • 若$x^{(i)}, x^{(j)}$是同一人,则$||f(x^{(i)}) - f(x^{(j)})||^2$很小。
  • 若$x^{(i)}, x^{(j)}$不是同一人,则$||f(x^{(i)}) - f(x^{(j)})||^2$很大。、

为了达成这个目标,我们来看看应该用怎样的误差来指导网络的优化方向。

三元组误差

在让网络学习基于距离的相似度时,一种常用的误差是三元组误差(Triplet Loss)。

在一轮训练中,我们要用到3张图片:一张基准图(anchor)$A$和与其相对的正负样本$P, N$各一张。设$d(A, B)$为图片$A, B$的编码的距离,则我们希望$d(A, P) \leq d(A, N)$。

左边的人是吴恩达的妻子,这几张图片已经出现了好几次。

移个项,即我们希望:

但这个条件太容易满足了,令$f(x)=0$恒成立的话无论怎样的输入都可以使上式左侧为0了。因此,我们要加一点小小的约束:

这个$\alpha$是一个较小的数,比如可以令$\alpha=0.2$。为了达成这个目标,我们可以设置以下的误差函数

这里取max的直观解释是:只要满足开始那个不等式,让正样本和负样本能够分清楚就行了。至于两类样本要不要分辨得那么分明,我们并不关心。

为了利用这个误差训练网络,我们要在训练集里为同一个人准备多张照片。比如1000个人,每人100张照片,共100000张照片构成训练集。

在提出此设计的FaceNet中,还有一些小细节:为了加大训练难度,让模型效果更好,每次训练时应该选择较难的三元组$A, P, N$。详情请阅读原论文。

人脸验证与二分类问题

其实,判断两张图片的编码是否相似的问题可以简单地转化成一个二分类问题:把两张图片的编码“拼起来”,输入进一个逻辑回归的单元,让网络输入两张图片是否相似。

这里把两张图片“拼起来”有很多的实现方式。一种简单的方式是求两个编码的绝对值(L1距离)。

另外,在部署时,由于比较的图像的编码是可以预处理的,只需要用神经网络跑一遍输入图片即可。

神经网络风格迁移

我之前写了一篇详细介绍神经网络风格迁移的文章,我认为那篇文章比这堂课更好理解,非常推荐大家阅读。因此,这部分笔记我会写得潦草一些。

什么是神经网络风格迁移?

如上图所示,在神经网络风格迁移中,输入一张表示内容的图(C)和一张表示画家风格的图(S),我们可以借助神经网络生成一幅融合内容与风格的图片(G)。

接下来实现神经网络风格迁移时,我们会关注CNN浅层和深层提取出来的特征。因此,在具体介绍风格迁移的实现之前,我们先来看看CNN各层网络究竟学到了什么。

深度卷积网络学到了什么?

为了设法可视化神经网络每一层的输出,我们可以考虑把整个训练集喂入网络,找出使某一层某一神经元激活输出最大的几个像素块。

以AlexNet为例,令第一层某几个神经元激活输出最大的像素块是:

可以看出,浅层的神经网络更关注不同方向轮廓信息。

用同样的方式可视化更深的层,可以看出网络关注的信息越来越抽象。

损失函数

和优化一个网络的参数不同,在神经网络风格迁移中,我们要把一幅图像当成可以优化的参数,通过修改图像的像素值来最小化一个损失函数。让我们看看这个损失函数是怎么定义的。

损失函数由两部分构成:生成图像(G)和内容图像(C)的内容损失与和风格图像(S)的风格损失。二者之间的比例靠比例系数$\alpha, \beta$控制。

我们来看一个优化这个损失函数的步骤示例:

  1. 生成一个随机图像$G: 100 \times 100 \times 3$
  2. 使用梯度下降更新$G$,最小化$J(G)$

内容损失

内容损失的计算方式如下:

  1. 使用一个预训练的网络(最常用的是VGG)。
  2. 选择神经网络中的某一层$l$,这一层相关的数据会用来计算内容损失。$l$越浅,表示越具体的图像;$l$越深,表示越抽象的图像。一般选取适中的$l$。
  3. 令$a^{l}, a^{[l][G]}$为图像$C, G$网络第$l$层的激活输出。我们认为,如果这两个值很相似,则两幅图像的内容就很相似。
  4. 令$J_{content}(C, G) = ||a^{l} - a^{[l][G]}||^2$

风格损失

看完了内容损失,现在来看风格损失的计算方式。我们刚刚一直在用名词“风格”。这个词放在美术里,可以指画家的绘画风格。而对于神经网络来说,“风格”又是什么意思呢?

和刚刚的内容损失类似,计算风格损失时,我们也要选择CNN的某层$l$,计算这一层卷积激活输出的通道相关量

这里“通道相关量”反映的是图像各个通道间两两的相关关系。这句话可能有点拗口,让我们看一个详细的通道相关量示例。

假设一个激活输出有5个通道,我们用颜色“红-黄-紫-绿-蓝”称呼它们。每个通道表示的是一类特征,比如红色的通道表示图像有没有竖着的条纹,黄色通道表示图像有没有橙色的色块。我们想计算红黄通道之间的相关量,其实就是计算图像每个像素处红色通道的值和绿色通道的值之间的相关关系,看看它们是会同时出现,还是一个出现了另一个就不出现。(两个数值的相关量就是它们的乘积,具体的数学表示会在后文中给出)

为什么这样的相关关系能够捕捉到风格信息呢?红色通道和黄色通道的相关关系,就是是不是有竖条纹的地方就有橙色色块。这样一种线条和颜色的关联,就可以代表图片的风格。(下图中红色的框和黄色的框分别圈出了两种特征)

从直觉上认识了风格,现在我们来看一看风格的具体计算方法。在计算两个向量的相关量时,可以直接求两个向量的积。因此,我们要用到一种叫做”风格矩阵“的中间量,它计算了所有通道向量的乘积:

其中,$a^{[l]}_{i, j, k}$是第$i$行$j$列第$k$个通道的激活输出,风格矩阵$G$的形状是$n_c^{[l]} \times n_c^{[l]}$,$G^{[l]}_{kk’}$描述的是第$l$层激活输出中,第$k, k’$个通道间的相关量。

在数学中这个矩阵叫做gram矩阵。

有了风格矩阵,就可以算风格损失了。为了简化表示,我们用$G^{[l]}(S), G^{[l]}(G)$分别表示风格图像S和生成图像G的风格矩阵。和刚刚一样,我们选择一层$l$,并计算风格误差:

这是一层上的风格误差。其实,我们还可以指定多个层上的风格误差,以使生成图像能够拟合风格图像在多个卷积层上(抽象程度不同)的风格。多个层的风格误差就是各层风格误差之和。

1D和3D上的卷积

在结束这门课之前,我们来学习最后一项内容——1D和3D数据上的卷积。之前,我们只关注2D的图像数据,当维度不是2时,卷积有怎样的变化呢?

1D数据可以是心电图。和可以用2D卷积捕捉2D图像的边缘一样,我们可以用下图中那个尖尖的1D卷积来捕获高频的数据。1D卷积前后的形状变化规律和2D一样,比如用16个长度为5的卷积卷一个$14 \times 1$的1D图像,可以得到$10 \times 16$的1D图像(第一维是图像长度,第二维是通道数)。

3D数据也是类似的道理。用16个$5 \times 5 \times 5$的卷积核卷一个单通道$14 \times 14 \times 14$的单通道图像,可以得到一个$5 \times 5 \times 5 \times 16$的图像。

总结

这节课主要介绍了人脸识别和神经网络风格迁移两项任务,最后顺便介绍了1D和3D上的卷积。

  • 人脸识别
    • 人脸验证与人脸识别任务的定义
    • 如何对单样本学习建模
    • 孪生网络
    • 基于三元组误差和二分类误差的人脸识别网络
  • 神经网络风格迁移
    • 神经网络风格迁移的输入输出
    • 利用小技巧可视化 CNN 学到的图像信息
    • 生成风格迁移图像时的内容误差与风格误差

人脸识别任务依赖于数据集,项目搭建起来比较麻烦。对于这一周的内容,我就不展示其他的代码实战笔记了,欢迎阅读我之前写的神经网络风格迁移的实现

在这篇文章中,我将给出一份带运行示例的NMS Python脚本,并对算法和代码进行详细解说。相信大家看完这篇文章后,能够轻松地掌握NMS的底层原理。

如果你对目标检测的基础知识不太熟,欢迎先阅读我的上篇文章:目标检测简介

示例脚本(包括可视化的代码)链接:https://github.com/SingleZombie/DL-Demos/tree/master/dldemos/nms

算法介绍

在目标检测算法中,为了尽量不漏掉物体,会输出大量的检测结果(每一条结果由检测概率与检测框组成)。这些检测结果很可能有重复的,即会有多个框标出了同一个物体。下图就是一个例子,算法正确识别出了两辆车,但却有多个检测框标出了同一辆车。

我们需要一个算法来过滤多余的检测框。最常用的算法就是NMS(Non-Maximum Suppresion, 非极大值抑制)。该算法的思路很简单:只保留局部概率最大的检测框,与其重合的其他检测框都会被舍去。

算法的伪代码如下:

text
1
2
3
4
5
6
7
输入所有检测概率、检测框
当还有检测框没有处理:
从剩下的框里挑出检测概率最大的框 bbox_a
遍历所有没有处理的框bbox_i:
如果 bbox_i != bbox_a 且 bbox_i 与 bbox_a 重合:
舍去 bbox_i
把 bbox_a 输出成一个检测结果

当然,这个算法的描述还不够准确:究竟该怎么定义两个检测框是“重合”呢?如果两个检测框有交集就说它们重合是行不通的,因为图片中很可能会有挨得很近的物体,它们的检测框就是相交的。因此,我们一般用IoU(交并比)来描述两个框的重合程度,如果IoU超出某个阈值,就说这两个框是“重合”的。

IoU的计算很简单,算出两个矩形的「交面积」,再用两个矩形面积之和减去「交面积」就可以得到「并面积」,「交面积」比「并面积」就是IoU。

在NMS之前,一般还会先做一步预处理:对于预测概率小于某个阈值的检测框,我们不信任它们,会在进行NMS之前就把它们舍去。在代码实现时这部分逻辑常常会放到NMS的函数里。这样,整个NMS的流程是这样的:

上述的NMS针对是识别一种物体的任务,在推广到多分类NMS时,只要把输入的检测概率换成有物体的概率乘上概率最大的类别的概率即可。

代码实现

理论上这段代码是兼容任何格式的张量的。经测试,NumPy, PyTorch 的张量都可以正确运行。

先看一下IoU的实现代码:

1
2
3
4
5
def iou(b1: Tuple[int, int, int, int], b2: Tuple[int, int, int, int]) -> float:
intersection = box_intersection(b1, b2)
inter_area = area(intersection)
union_area = area(b1) + area(b2) - inter_area
return inter_area / union_area

所有的检测框用一个长度为4的元组表示。box_intersection用于计算两个框的相交框,area用于计算框的面积。这段代码和之前描述得一样,先获取相交区域,计算交面积,再计算并面积,最后算除法。

box_intersection的实现如下:

1
2
3
4
5
6
7
8
9
10
11
def box_intersection(
b1: Tuple[int, int, int, int],
b2: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
x11, y11, x12, y12 = b1
x21, y21, x22, y22 = b2

xl = max(x11, x21)
xr = min(x12, x22)
yt = max(y11, y21)
yb = min(y12, y22)
return (xl, yt, xr, yb)

算相交区域,可以理解成分别求出x, y方向上的相交部分,再把两部分合成一个框。而求直线的相交,就是取最大的左端点和最小的右端点。

area的实现如下:

1
2
3
4
5
def area(box: Tuple[int, int, int, int]) -> float:
x1, y1, x2, y2 = box
width = max(x2 - x1, 0)
height = max(y2 - y1, 0)
return width * height

如果两个框不相交,x2 - x1y2 - y1会出现小于0的情况。因此,要保证它们最小为0,再算面积就不会有错了。

有了iou,就可以实现了NMS了。我的NMS函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def nms(predicts: np.ndarray,
score_thresh: float = 0.6,
iou_thresh: float = 0.3):
"""Non-Maximum Suppression

Args:
predicts (np.ndarray): Tensor of shape [n, 5]. The second demesion
includes 1 probability and 4 numbers x, y, w, h denoting a bounding
box.
score_thresh (float): The boxes with probability lower than
score_threash will be discarded.
iou_thresh (float): The threshold determining whether two boxes are
"overlapped".

Returns:
(np.ndarray, List[int]): The filtered predictions and the indices of
remaining boxes.
"""

NMS算法需要输入检测结果predicts,输出过滤后的检测结果filtered_predicts。此外,NMS算法有两个输入属性score_threshiou_thresh,分别表示被选定的检测框最小需要的概率、判断两个框是否重合的IoU阈值。为了方便其他的计算(比如多分类NMS),我还输出了一个索引数组indices,表示被选中的框的索引。这方面的逻辑可以根据自己的项目要求进行优化,没有统一的规定。

NMS的实现和开始的伪代码几乎一模一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def nms(predicts: np.ndarray,
score_thresh: float = 0.6,
iou_thresh: float = 0.3):
n_remainder = len(predicts)
vis = [False] * n_remainder

# Filter predicts with low probability
for i, predict in enumerate(predicts):
if predict[0] < score_thresh:
vis[i] = True
n_remainder -= 1

# NMS
output_predicts = []
output_indices = []
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

# Append output
max_p = predicts[max_index]
output_predicts.append(max_p)
output_indices.append(max_index)

# Suppress
for i, p in enumerate(predicts):
if not vis[i] and i != max_index:
if iou(p[1:5], max_p[1:5]) > iou_thresh:
vis[i] = True
n_remainder -= 1
vis[max_index] = True
n_remainder -= 1

return output_predicts, output_indices

一开始,先声明一些辅助的变量。n_remainder表示还有多少个框没被访问,vis[i]表示第i个框是否被访问。

1
2
n_remainder = len(predicts)
vis = [False] * n_remainder

一开始,先过滤那些概率过小的框:

1
2
3
4
for i, predict in enumerate(predicts):
if predict[0] < score_thresh:
vis[i] = True
n_remainder -= 1

之后,正式进入NMS,先准备好输出的列表:

1
2
3
4
# NMS
output_predicts = []
output_indices = []
while n_remainder > 0:

在还有没访问的框时,先找出剩下的框中概率最大的那个框:

1
2
3
4
5
6
7
8
9
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

之后,抑制掉和概率最大框“重合”的框。

1
2
3
4
5
6
7
8
9
10
11
while n_remainder > 0:
# Find argmax
...

max_p = predicts[max_index]
# Suppress
for i, p in enumerate(predicts):
if not vis[i] and i != max_index:
if iou(p[1:5], max_p[1:5]) > iou_thresh:
vis[i] = True
n_remainder -= 1

最后,把这个结果添加进输出,并维护好visn_remainder

1
2
3
4
5
6
7
8
9
10
11
12
while n_remainder > 0:
# Find argmax
...

# Suppress
...

# Append output
output_predicts.append(max_p)
output_indices.append(max_index)
vis[max_index] = True
n_remainder -= 1

这里可以把vis[max_index] = True那两行移到抑制操作的前面,这样判断里就不用加i != max_index了。我这样写是为了强调一下,判断重合的时候不需要判断自己。

实现完了,返回结果。

1
return output_predicts, output_indices

单元测试

做单元测试的时候,最好是找一份现有的实现用做对照。为了让整个测试过程更贴合实际一些,我用MMDetection的YOLOv3跑了一个NMS前和NMS后的检测框结果,并把NMS前的检测框输入进了自己实现的NMS,比较了两份NMS的输出。以下是具体的测试过程。

照着MMDetection的官方文档,下载好YOLOv3模型,用下面的代码即可对MMDetection的demo图片进行推理并保存结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mmdet.apis import inference_detector, init_detector, show_result_pyplot
from mmdet.models.detectors import BaseDetector

# Choose to use a config and initialize the detector
config = 'configs/yolo/yolov3_mobilenetv2_320_300e_coco.py'
# Setup a checkpoint file to load
checkpoint = 'checkpoints/yolov3_mobilenetv2_320_300e_coco_20210719_215349-d18dff72.pth'
# initialize the detector
model: BaseDetector = init_detector(config, checkpoint, device='cuda:0')

img = 'demo/demo.jpg'
result = inference_detector(model, img)

model.show_result(img, result, score_thr=0.3, out_file='work_dirs/1.jpg')

为了得到NMS前的输入,我在mmdet/models/dense_head/YOLOV3Head.get_bboxes里面插入了一段输出结果的代码。

1
2
3
4
5
6
7
8
9
10
11
...

save_dict = {
'bboxes': bboxes,
'cls_scores': scores,
'prob': objectness
}
torch.save(save_dict, 'work_dirs/bboxes.pt')

det_bboxes, det_labels = multiclass_nms(
...

得到NMS前的输入是

把这份数据输入进我们的NMS中,得到的可视化结果如下:

这跟开始那份输出一模一样。看来我们实现的这份NMS完全正确。

如果你对测试过程、可视化、多分类NMS感兴趣,欢迎直接阅读我的项目源码。

总结

在这篇文章中,我给出了一份NMS的简洁而靠近底层的Python实现,并对NMS算法进行了介绍。通过阅读这篇文章,相信大家已经完全理解了NMS的原理,并且能够用任何一种语言实现NMS。一般的NMS开源实现支持的参数更多,代码会更复杂一些,但它们的核心和我的这份实现是一样的。

这份NMS的实现还有很大的改进空间。比如每轮求概率最大的框时,可以先排好序,或者用优先队列,这样均摊下来每次获取概率最大的框的复杂度是O(logn)。但是后面判断重复的框一定有一个O(n)的计算,这部分的优化并不显著。大家有余力可以参考成熟的CV项目里NMS是怎么高效用C++实现的。

这节课中,我们要学习计算机视觉中最重要的任务之一——目标检测任务。我们会先认识目标定位和关键点检测这两个比较简单的任务,慢慢过度到目标检测任务。之后,我们会详细学习目标检测的经典算法YOLO。最后,我们会稍微认识一下语义分割任务及适用于此问题的U-Net架构。

课堂笔记

目标定位

图像分类问题中,给定一幅图片,我们只要说出图片里的物体是什么就行了。在这堂课要讨论的任务中,我们还要多做一件事——定位。我们要先用边框圈出图中的物体,再说出框里的物体是什么。这叫做带定位(localization)的分类问题。更进一步,如果我们不再是只讨论一个物体,而是要把图片中所有物体都框出来,并标出每一个物体的类别,这就是目标检测问题,

我们对分类任务的神经网络结构已经很熟悉了。那么,带定位的分类该使用怎样的网络呢?实际上,一个边框可以用边框中心和边框宽高这四个量表示。除了softmax出来的分类结果外,我们只要让网络再多输出四个数就行了。如下图所示:

这里,要统一一下对于边框的定义。我们用$b_x, b_y$表示边框的中心坐标,$b_h, b_w$表示边框的高、宽。

来看一下标签$y$的具体写法。假设一共有四类物体:行人、汽车、摩托车、背景(没有物体)。那么,标签$y$应该用$y=[p_c, b_x, b_y, b_h, b_w, c_1, c_2, c_3]^T$表示。其中,$p_c$表示图中有没有物体。若$p_c=1$,则$c_1, c_2, c_3$分别表示物体属于除背景外的哪一类;若$p_c=0$,则其他值无意义。

这样,在算误差时,也需要分类讨论。若$p_c=1$,则算估计值与标签8个分量两两之间的均方误差;若$p_c=0$,只算$p_c$的均方误差,不用管其他量。

只要更换一下神经网络的输出格式,我们就能得到一个完成目标定位问题的网络。

关键点检测

我们刚刚学了用2个点表示一个边框。其实,拓展一下边框检测,就是一个关键点(英文有时叫做”landmark”,是“地标”的意思)检测问题。

比如,在人脸关键点检测中,我们可以定义一堆关键点,分别表示眼睛、鼻子、嘴巴……的位置。我们还是让网络先输出一个数,表示图中有没有人脸;再输出2n个数,表示n个人脸关键点。这样,网络就能学习怎么标出人脸关键点了。

很多应用都基于人脸关键点检测技术。比如我们检测到了眼睛周围的关键点后,就可以给人“戴上”墨镜。

总之,通过这一节的学习,我们要知道,目标定位中输出2个坐标只是关键点检测的一个特例。只要训练数据按照某种规律标出了关键点,不管这些关键点是表示一个框,还是人脸上各器官的位置,网络都能学习这种规律。

目标检测

有了之前的知识储备,现在我们来正式学习目标检测。目标检测可以用一种叫做“滑动窗口”的技术实现。

假设我们要构建一个汽车的目标检测系统。我们可以先构造一个汽车分类数据集——数据集的x是一些等大的图片,y表示图片里是不是有汽车。如果图片里有汽车,汽车应该占据图片的大部分位置。

通过学习,网络就能够判断一个框里的物体是不是汽车了。这样,我们可以用一个边框框出图片的一部分,裁剪下来,让网络看看图片的这一部分是不是汽车。只要我们尝试的次数够多,总能找出图中的汽车。

在遍历边框时,我们是通过“滑动”的方法:遍历边框的大小,选择好大小后把框放到左上角,先往右移,再往下移。所以这种方法叫做“滑动窗口”。

滑动窗口算法有一个缺点:如果我们移动窗口的步伐过小,则运行分类器的次数会很多;如果移动窗口的步伐过大,则算法的精度会受到影响。在深度学习时代之前,分类器都是一些简单的线性函数,能够快速算完,多遍历一些滑动窗口没有问题。而使用了深度CNN后,遍历滑动窗口的代价就很大了。

幸好,滑动窗口也可以通过卷积来生成,而不一定要遍历出来。让我们看下去。

基于卷积的滑动窗口

滑动窗口其实可以通过执行巧妙的卷积来生成。在那之前,我们先学一门前置技能:怎么把全连接层变成卷积层。

前两周学习CNN时,我们学过,卷积结束后,卷积的输出会被喂入全连接层中。实际上,我们可以用卷积来等价实现全连接层。比如下图中,一个$5 \times 5 \times 16$的体积块想变成一个长度为400的向量,可以通过执行400个$5 \times 5$的卷积来实现。

知道了这一点后,我们就可以利用卷积来快速实现滑动窗口了。

假设我们按照上一节的算法,先实现了对$14 \times 14$的小图片进行分类的分类器。之后,我们输入了一张$16 \times 16$的大图片。我们遍历滑动窗口,令步幅为2。这样,理论上,有4个合法的滑动窗口,应该执行4次分类器的运算,如下图所示:

可是,仔细一想,在执行4次分类器的过程中,有很多重复的运算。比如,对于4个滑动窗口中间那共有的$12 \times 12$个像素,它们的卷积结果被算了4次。理想情况下,只需要对它们做一次卷积就行了。这该怎么优化呢?

其实,很简单,我们可以利用卷积本身的特性来优化。卷积层只定义了卷积核,而没有规定输入图像的大小。我们可以拿出之前在$14 \times 14$的图像上训练好的卷积层,把它们用在$16 \times 16$的图片的卷积上。经过相同的网络,$16 \times 16$的图片会生成一个$2 \times 2$大小的分类结果,而不是$1 \times 1$的。这$2 \times 2$个分类结果,恰好就是那4个滑动窗口的分类结果。通过这样巧妙地利用卷积操作,我们规避了遍历滑动窗口带来的重复计算。

不过,这个方法还是有一些缺陷的。在刚才那个例子中,$16 \times 16$的图片其实可以放下9个$14 \times 14$大小的边框。但是,由于分类网络中max pool的存在,我们只能生成4个分类结果,也就是步幅为2的滑动窗口的分类结果。同时,最准确的检测框也不一定是正方形的,而可能是长方形的。为了让生成的滑动窗口更准确一些,我们要用到其他方法。

预测边框

在这一节,我们要使用YOLO(You Only Look Once)算法解决上一节中碰到的问题。还记得这周课开头学的目标定位问题吗?我们可以把滑动窗口和目标定位结合一下。

给定一幅图像,我们可以把图像分成$3 \times 3$个格子。训练模型前,我们要对训练数据做预处理。根据每个训练样本中物体的中心点所在的格子,我们把物体分配到每一个格子中。也就是说,不管一个物体的边框跨越了几个格子,它的中心点在哪,它就属于哪个格子。比如对于下图的训练样本,右边那辆车就属于橙色的格子。之后,我们给每个格子标上标签$y$。这个标签$y$就是目标定位中那个表示图片中是否有物体、物体的边框、物体的类别的标签向量。对于这个$3 \times 3$的格子,有9个标签向量,整个标签张量的形状是$3 \times 3 \times 8$。

这样,每一幅图像的输出和标签一样,也是一个$3 \times 3 \times 8$的张量了。输入一幅图片后,我们利用上一节学的卷积滑动窗口,同时预测出每个格子里的物体边框。

另外,这里要详细讨论一下$b_x, b_y, b_h, b_w$的表示方法。由于我们只关心框相对于格子的位置,因此我们可以把规定一个格子的边长为1。这样,就满足$0 \leq b_x, b_y \leq 1$了。不过,由于物体的边框可以超出小框,$b_h, b_w > 1$是很有可能的。

看到这,大家可能会有一些疑问:如果一个格子里有多个物体呢?的确,这个算法无法输出一个格子里的多个物体。一种解决方法是,我们可以把格子分得更细一点,比如$19 \times 19$个格子。这样,可以被检测到物体会多一些。但是,增加格子数又会引入一个新的问题——多个格子检测到了同一个物体。下面的两节里我们会尝试解决这个新的问题。

吴恩达老师说,YOLO这篇论文很难读懂,他和其他几个资深研究者都花了很大的功夫才读懂这篇论文。

IoU(交并比)

在目标检测中,有一个微妙的问题:框出一个物体的边框有无数个,想精确框出标签的边框是不可能的。怎么判定一个输出结果和标签里的边框“差不多”呢?这就要用到IoU(Intersection over Union,交并比) 这个概念。

IoU,顾名思义,二者的交集比上二者的并集,很好理解。比如下图中,网络的输出是紫框,真值是红框。二者的并集是绿色区域,交集是橙色区域。则IoU就是橙色比绿色。

依照惯例,如果IoU$\geq 0.5$,我们就认为网络的输出是正确的。当然,想更严格一点,0.6,0.7也是可以的。

IOU 也是 “I owe you(我欠了你的钱)”的缩写,哈哈哈。

NMS(非极大值抑制)

假设在YOLO中,我们用$19 \times 19$个小格来检测物体。可是,由于小格子太多了,算法得到了多个重复的检测框(以及每个框中有物体的概率)。这该怎么办呢?

NMS(Non-Maximum Suppresion,非极大值抑制)就是解决这个问题的算法。这个算法的名字听起来很奇怪,但大家理解了这个算法的实现后,就知道这个“抑制”是什么意思了。

讲起算法我就不困了。我会抛弃视频中的讲解思路,用我自己的逻辑讲一遍。讲算法,千万不能一上来就讲算法的步骤,一定要先讲清楚算法的思路。

在学NMS之前,我们先动动脑,看看在去掉重复的框时,我们期望得到怎样的去重输出结果。

首先,既然是去重,那么就不能出现两个框过度重合的情况。其次,我们希望留下来的框的预测概率尽可能大。

在这两个要求下,我们来看看上面那幅图的输出应该是怎样的。
我们一眼就能看出,对于左边那辆车,我们应该保留0.8的框;对于右边那辆车,我们应该保留0.9的框。

为什么我们能“一眼看出”呢?这是因为左边两个框、右边三个框恰好都分别表示了一辆车。我们能够快速地把这些框分成两类。但是,在情况比较复杂时,我们就难以快速找出最好的框了。比如下面这种情况中,两辆车很近,有些框甚至同时标出了两辆车:

为了处理这种复杂的情况,我们必须想出一种万全的算法,以筛选出那些概率比较大的框。

稍微思考一下,其实这样的算法非常简单:找出最大的框,去掉所有和它过度重合的框;在剩下的框中,找出最大的框,去掉所有和它过度重合的框;……。一直重复直到没有未处理的框为止。这就是NMS算法。

还是让我们来看看刚刚那个例子。使用NMS时,我们会先找到0.9这个框,“抑制”掉右边0.6和0.7的框。在剩下的框中,最大的是0.8这个框,它会“抑制”掉左边那个0.7的框。

接下来,让我们来严格描述一下这个算法。假设我们有$19 \times 19=361$个输出结果,每个输出结果是一个长度为5的向量$[p_c, b_x, b_y, b_h, b_w]$,分别表示有物体的概率、边框的中心和高宽(我们先不管检测多个物体的情况。事实上,当推广到多个物体时,只要往这个输出结果里多加几个概率就行了)。我们要用NMS输出应该保留的检测结果。“过度重合”,就是两个框的IoU大于0.5。

首先,先做一步初筛,扔掉概率$p_c$小于0.6的结果。

之后,对于没有遍历的框,重复执行:找出概率最大的框,把它加入输出结果;去掉所有和它IoU大于0.5的框。

这个过程用伪代码表示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Input and preprocessing
input predicts of size [19, 19, 5]
resize predicts to [361, 5]

# Filter predicts with low probability
filtered_predicts = []
for predict in predicts:
# drop p_c < 0.6
if predict[0] >= 0.6:
filtered_predicts.append(predict)

# NMS
n_remainder = len(filtered_predicts)
vis = [False] * n_remainder # False for unvisited item
output_predicts = []
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(filtered_predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

# Append output
max_p = filtered_predicts[max_index]
output_predicts.append(max_p)

# Suppress
for i, p in enumerate(filtered_predicts):
if not vis[i] and i != max_index:
if get_IoU(p[1:5], max_p[1:5]) > 0.5:
vis[i] = True
n_remainder -= 1
vis[max_index] = True
n_remainder -= 1

假设进NMS的框有$N$个。算法里求当前最大框那一步可以用优先队列来优化,这一步复杂度是$O(logN)$。但是“抑制”那一步必须要遍历一遍剩下的框,还是有一个$O(N)$复杂度(我暂时想不出朴素的低复杂度的算法)。因此,不用优先队列优化也差不多。算上外层的循环,整个算法的复杂度是$O(N^2)$。在实际的应用中,送入NMS的结果没那么多,不会超过10000个。而且,随着框被不断过滤,外层循环的次数会减少不少。这个算法的性能瓶颈不在输入数$N$上,而在于求IoU实现的细节上。

锚框(Anchor Boxes)

为了让一个格子能够检测到多个物体,YOLO论文还提出了一种叫做锚框(Anchor Boxes)的技术。

假设一个格子里同时包含了两个物体:一个“竖着”的人和一个“横着”的车。那么,我们可以以这个格子的中心点为“锚”,画一个竖的框和横的框,让每个格子可以检测到两个物体。这样,人和车都能被检测了。

严谨地描述,锚框技术是这样做改进的:

  • 之前,每一个格子只能包含一个样本。训练数据中每一个标签框会被分配到它中点所在格子
  • 现在,每一个格子能包含多个样本。每个格子都会预定义几个不同形状的锚框,有几个锚框,就最多能检测到几个物体。训练数据的每一个标签框会被分配到和它交并比最大的锚框

注意,之前的最小单元是格子,现在是锚框,所以说现在每个样本被分配到锚框上而不是格子上。可以看下面这两个样本的例子,第一个例子是两个物体都检测到了,第二个是只有锚框2里有物体。和之前一样,如果有某个锚框里没有物体,则除了$p_c$外全填问号即可。

锚框技术实际上只是对训练数据做了一个约束,改变了训练数据的格式。检测算法本身没有什么改变。

YOLO 算法总结

让我们把前几节的内容总结一下,看一下YOLO算法的全貌。

在训练前,我们要对数据做预处理。首先,我们要指定以下超参数:图片切分成多大的格子、每个格子里有多少个锚框。之后,根据这些信息,我们可以得到每一个训练标签张量的形状。比如$3 \times 3 \times 2 \times 8$的一个训练标签,表示图片被切成了$3 \times 3$的格子,每个格子有两个锚框。这是一个三分类问题,对于每一个检测出来的物体,都可以用一个长度为$8$的向量表示。其中,$p_c$表示这个锚框里有没有物体, $(b_x, b_y), (b_h, b_w)$分别表示中心点坐标、框的高宽,$c_1, c_2, c_3$分别表示是否为该类物体。

有了预处理好的训练数据,就可以训练一个CNN了。

在网络给出了输出后,由于输出的框往往多于标签中的框,还要对输出结果进行筛选。筛选的过程如前文所述,先去掉概率过小的框,再分别对每一类物体的框做NMS。

课堂上没有介绍loss。loss的组成比较复杂,建议阅读原论文。

区域提案

YOLO算法是在一堆固定的框里找物体。实际上,我们还可以用神经网络来找出候选框,再在这些框里详细检测。这种技术就叫做区域提案(region proposal),相关的网络叫做R-CNN(Region with CNN)。

R-CNN 系列网络有多个改进版本:

  • R-CNN: 使用区域提案,但是每次只对一个区域里的物体做分类。
  • Fast R-CNN: 使用区域提案,并使用基于卷积的滑动窗口加速各区域里物体的分类。
  • Faster R-CNN: 前两个算法都是用传统方法提案区域,Faster R-CNN用CNN来提案区域,进一步令算法加速。

吴恩达老师认为,虽然区域提案的方法很酷,但把目标检测分两步来完成还是太麻烦了,一步到位的YOLO系列算法已经挺方便了。

基于U-Net的语义分割

最早这门课是没有这一节的,估计U-Net的架构太常用了,吴恩达老师把基于U-Net的语义分割加入了这周的课中。

语义分割也是应用非常广泛的一项CV任务。相较于只把物体框出来的目标检测,语义分割会把每一类物体的每个像素都精确地标出来。如下图的示例所示,输入一张图片,语义分割会把每一类物体准确地用同一种颜色表示。

具体来说,语义分割的输出是一个单通道图片。图片的数字表示此处像素的类别。

在分类模型中,图像会越卷越小,最后压平放进全连接层并输出多个类别的分类概率。而在语义分割模型中,由于模型的输出也是一幅图像,在输入图像被卷小了以后,应该还有一个放大的过程。

目前,我们还没有学过带学习参数的可以放大图像分辨率的结构。下一节介绍的反卷积能够完成这件事。

反卷积

反卷积和卷积的输入输出大小彻底相反。让我们看看反卷积的形状是怎么计算的。

如上图所示,反卷积也有卷积核大小、步幅、填充这些参数。不过这些参数都是在输出图像上做的。也就是说,我们会在输出图像上做填充,并且每次在输出图像上一步一步移动。我们把正卷积的输出大小计算公式套到反卷积上的输出上,就能算出反卷积的输入的大小。

在卷积时,我们是把卷积核与图像对应位置的数字乘起来,再求和,算出一个输出值;反卷积则是反了过来,把一个输入值乘到卷积核的每个位置上,再把乘法结果放再输出的对应位置上。一趟反卷积计算如下图所示:

这里我们只需要知道反卷积可以做上采样就行了,不需要纠结底层实现细节。

本课对反卷积的介绍甚少。实际上,反卷积可以通过正卷积来实现。我扫了一圈没看到讲解得比较好的相关文章,有兴趣的可以自行查找资料。

U-Net 架构

学完了反卷积,可以来看U-Net的结构了。

U-Net除了对图像使用了先缩小再放大的卷积外,还使用了一种跳连(不是ResNet中残差连接的跳连,而是把两份输入拼接在了一起)。这样,在反卷积层中,不仅有来自上一层的输入,还有来自前面相同大小的正卷积的结果。这样做的好处是,后半部分的网络既能获得前一个卷积的抽象、高级(比如类别)的输入,又能获得前半部分网络中具体,低级的特征(比如形状)。这样,后面的层能够更好地生成输出。

U-Net具体的结构如下:

这幅图中,做运算的图像张量被表示成了一个二维矩形,矩形的高度是图像的宽高,矩形的宽度是通道数。U-Net的前半部分和常见的CNN一样,缩小图像大小,增大图像通道数。而在后半部分中,每次上采样时,一半的通道来自上一层的输出,另一半的通道来自于网络前半部分。

从图中能看出,U-Net的结构图是一个“U”型,因此它才被叫做U-Net。

总结

在这篇文章中,我们主要学习了以下内容:

  • 任务定义与输出格式
    • 目标定位
    • 关键点检测
    • 目标检测
    • 语义分割
  • YOLO
    • 用卷积实现全连接
    • 用卷积实现滑动窗口
    • 锚框
    • IoU
    • NMS
    • YOLO算法
  • U-Net
    • 反卷积
    • U-Net架构

这周的代码实战中,我会详细讲解NMS的实现。时间允许的话,我还会展示一下如何在COCO上训练YOLO。

今天,刚收到公众号官方的两条通知,说我的文章涉嫌整合他人内容,被暂时取消原创声明功能。

一开始,我还以为是系统的检测算法出了故障,赶紧低声下气地提交了申诉,说明全网上我的文章都是由我自己完成的。

当我回过头来阅读通知时,愕然发现了“经用户投诉且经平台审核”这几个字,一时间怒火中烧:哦,原来我是被诬告了。

我是两三个月前才在开始在公开媒体上创作的,对一些规则可能不熟。有些长期做自媒体的人或许会劝我:唉,这种事正常啊。只要去申诉,给你恢复就行了。而且你看,就封了两天,之后不就正常了?忍一忍就过去了。你不要玻璃心了。

是啊,从利益的角度来看,这件事不就是让我两天发不了“原创”的文章?两天过后一切损失都抹平了。

才怪呢。

这件事对我真正的影响,是损害了我的名誉。

说实话,你可以说我水平差劲,说我没钱没势,说我狂妄自大。背后说,当面讲,拿着个大喇叭对全国人民喊。我都会不以为然。

问题是,对于我的作品,对于我辛辛苦苦创作出来的受到了客观认可的作品,你不能诋毁它。甚至不是去挖苦文章的内容,而是拿最恶劣的抄袭来指控我。这是对我名声的侮辱,对所有有尊严的创作者的侮辱,也是你们创作平台自己的耻辱。

通知里说“有用户举报”。我不知道是不是真的有人举报。如果是真的,那我也奈何不了那个人。在这件事上,我是弱势的一方。我也不知道是谁干的,也没有受到什么严重的经济损失,没有任何追责的可能。可能别人就是觉得好玩,顺手按了个举报按钮呢?我除了骂一句“此人卑鄙无耻”以外,也做不了什么。

真正有问题的是微信公众号的官方。你们的审核人员心慵意懒,玩忽职守。手握审核大权而不知善用,身着公正之衣而不辩是非。不察之下竟把抄袭之罪强加于光明磊落的原创作者,以至于颠倒是非,污人清白,真是岂有此理!

你以为你们平台做起来靠的是什么?靠的是你们掌握的数以亿计的流量?别开玩笑了。给你们带来价值的,是会下金蛋的鸡。看着满棚的金鸡,几位手持饲料的奴仆倒好像也长出了翅膀,以为自己也能下出金蛋一般,觉得随手杀掉一两只鸡也无所谓。真是可笑至极。

我这里还要好心奉劝一下所有的创作平台,烦请你们给审核人员的评估指标中加一个错审率,加大造成冤情的惩罚。同时,在认定冤情的申诉通过后,把“对不起”三个大字好好地打在私信里。

仔细一想,这事也怪不了公众号平台,整个环境毕竟就是这样的。

每天在平台上发送的内容那么多,审核员能够把每篇文章都过一遍都实属不易,出几个纰漏也是情理之中。这些道理我肯定都懂,也可以理解。

但趁着这口气,我还要发表一下对于中国内容创作平台的看法。

以前,去网上查编程知识的时候,查出来的全是低劣的复制粘贴文章。想要搜个教程,还要跳过那万年不变的前几个网站,去后面几个搜索结果的跟帖中翻出学习资源来。想在网络中找精品资源,可谓是沙里淘金,海底捞针。

现在,我学有小成,想在网上分享一些学习的心得。可是,又关注者寥寥。

是我不会用搜索引擎吗?是我写不出好的文章吗?

我看,是这个互联网的运行机制有问题啊。

在“后来者居上”的论坛中,优秀的帖子还是会被顶起,随后贴上“精品”的标签,供后人赏读。

而在以推荐机制为主的封闭创作平台当道之后,本来就稀有的精品内容便沉入了泥潭之下。只推荐自己喜欢的内容,有谁不乐意呢?坐揽着源源不断的流量,那哪平台不开心呢?这就是大势所趋啊。

平台只知道流量,只知道赚钱。但这也没有办法。很多平台看似规模宏大,实际上,他们还烧着投资人的钱,他们自己还身陷囹圄,入不敷出。因此,他们只能想尽一切办法,赶快扩大规模,赶快收割流量,赶快盈利。然而,哪怕真有一日,他们开始盈利了,也只会在只知道赚钱的道路上转不过弯,忘记了当年平台是怎么火起来的。

公益性地维护一个优质的内容平台。这种看上去吃力不讨好的事情,小平台不会做,大平台也不会做。

按他们这样下去,中国互联网上优质的文章只会越来越少见,看不到更好的未来啊。

质量和金钱,真的就是互斥的关系吗?

我看,只是运营这些平台的人太菜了吧。

一来,他们过于浮躁。在指定最优化目标时,只想到了赚钱,却不知道往里面加一点点的“情怀”。

二来,他们水平低下。但凡掺入了一些不赚钱的因素,就觉得要运营不下去了。

三来,他们目光短浅。以为创造没有利润的精品是在浪费时间,实际上有内涵的事物在多年后能够带来超出金钱的价值。

等我有钱了,我能够把这一切都做好。

我知道,十多年的寒窗苦读,对多数人来讲并不是什么愉快的经历。很多时候,并不是自己没有学好,而是教育的方法有问题。这一问题在大学之后尤为突出。倘若当年能够收获一些优质的知识,也不至于会走那么多的弯路。

等我有钱了,我会设法建立一个吸引优秀创作者的平台,把优质的内容结合并组织起来,把名声打响,让大家都能来这里学习。我不仅要做一些“公益”的事情,我还要赚钱,我要把平台持久地运营下去。我会扶正互联网的创作风气,还互联网一个蓬勃发展的未来。

在这篇文章里,我也只能随口嚷嚷。诸君把这些话当作笑谈即可。不过,在当下,我还是会慢慢地行动着,创作着。

如果未来优秀的中文内容越来越多,说不定不再是我们计算机学生抱着一堆机械工业出版社的黑书,而是美国的教授拿着一本本从中文英化过去的参考书。

在这篇文章中,我会介绍如何用TensorFlow实现下面4个模型:

  1. ResNet-18
  2. ResNet-18 无跳连
  3. ResNet-50
  4. ResNet-50 无跳连

实现结束后,我会在一个简单的数据集上训练这4个模型。从实验结果中,我们能直观地看出ResNet中残差连接的作用。

项目链接:https://github.com/SingleZombie/DL-Demos

主要代码在dldemos/ResNet/tf_main.py这个文件里。

模型实现

主要结构

ResNet中有跳连的结构,直接用tf.keras.Sequenctial串行模型不太方便。因此,我们要自己把模型的各模块连起来,对应的TensorFlow写法是这样的:

1
2
3
4
5
6
7
8
9
10
11
# Initialize input
input = layers.Input(input_shape)

# Get output
output = ...

# Build model
model = models.Model(inputs=input, outputs=output)
print(model.summary())
return model

layers.Input创建一个输入张量后,就可以对这个张量进行计算,并在最后用tf.keras.models.Model把和该张量相关的计算图搭起来。

接下来,我们看看这个output具体是怎么算出来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def init_model(
input_shape=(224, 224, 3), model_name='ResNet18', use_shortcut=True):
# Initialize input
input = layers.Input(input_shape)

# Get output
x = layers.Conv2D(64, 7, (2, 2), padding='same')(input)
x = layers.MaxPool2D((3, 3), (2, 2))(x)

if model_name == 'ResNet18':
x = identity_block_2(x, 3, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 128, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 256, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 512, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
elif model_name == 'ResNet50':

def block_group(x, fs1, fs2, count):
x = convolution_block_3(x, 3, fs1, fs2, 2, use_shortcut)
for i in range(count - 1):
x = identity_block_3(x, 3, fs1, fs2, use_shortcut)
return x

x = block_group(x, 64, 256, 3)
x = block_group(x, 128, 512, 4)
x = block_group(x, 256, 1024, 6)
x = block_group(x, 512, 2048, 3)
else:
raise NotImplementedError(f'No such model {model_name}')

x = layers.AveragePooling2D((2, 2), (2, 2))(x)
x = layers.Flatten()(x)
output = layers.Dense(1, 'sigmoid')(x)

# Build model
model = models.Model(inputs=input, outputs=output)
print(model.summary())
return model

构建模型时,我们需要给出输入张量的形状。同时,这个函数用model_name控制模型的结构,use_shortcut控制是否使用跳连。

1
2
def init_model(
input_shape=(224, 224, 3), model_name='ResNet18', use_shortcut=True):

在ResNet中,主要有两种残差块。

第一种是上图中实线连接的,这种残差块的输入输出形状相同,输入可以直接加到激活函数之前的输出上;第二种是上图中虚线连接的,这种残差块输入输出形状不同,需要用一个1x1卷积调整宽高和通道数。

此外,每种残差块用两种实现方式。

第一种实现方式如上图左半部分所示,这样的残差块由两个通道数相同的3x3卷积构成,只有一个需要决定的通道数;第二种实现方式采用了瓶颈(bottlenect)结构,先用1x1卷积降低了通道数,再进行3x3卷积,共有两个要决定的通道数(第1, 2个卷积和第3个卷积的通道数),如上图右半部分所示。

代码中,我用identity_block_2, identity_block_3分别表示输入输出相同的残差块的两种实现,convolution_block_2, convolution_block_3分别表示输入输出不同的残差块的两种实现。这些代码会在下一小节里给出。

现在,我们来看看该如何用这些模块构成ResNet-18和ResNet-50。首先,我们看一看原论文中这几个ResNet的结构图。

对于这两种架构,它们一开始都要经过一个大卷积层和一个池化层,最后都要做一次平均池化并输入全连接层。不同之处在于中间的卷积层。ResNet-18和ResNet-50使用了实现方式不同且个数不同的卷积层组。

在代码中,开始的大卷积及池化是这样写的:

1
2
x = layers.Conv2D(64, 7, (2, 2), padding='same')(input)
x = layers.MaxPool2D((3, 3), (2, 2))(x)

ResNet-18的实现是:

1
2
3
4
5
6
7
8
9
if model_name == 'ResNet18':
x = identity_block_2(x, 3, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 128, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 256, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)
x = convolution_block_2(x, 3, 512, 2, use_shortcut)
x = identity_block_2(x, 3, use_shortcut)

其中,identity_block_2的参数分别为输入张量、卷积核边长、是否使用短路。convolution_block_2的参数分别为输入张量、卷积核边长、输出通道数、步幅、是否使用短路。

ResNet-50的实现是:

1
2
3
4
5
6
7
8
9
10
11
elif model_name == 'ResNet50':
def block_group(x, fs1, fs2, count):
x = convolution_block_3(x, 3, fs1, fs2, 2, use_shortcut)
for i in range(count - 1):
x = identity_block_3(x, 3, fs1, fs2, use_shortcut)
return x

x = block_group(x, 64, 256, 3)
x = block_group(x, 128, 512, 4)
x = block_group(x, 256, 1024, 6)
x = block_group(x, 512, 2048, 3)

其中,identity_block_3的参数分别为输入张量、卷积核边长、中间和输出通道数、是否使用短路。convolution_block_3的参数分别为输入张量、卷积核边长、中间和输出通道数、步幅、是否使用短路。

最后是计算分类输出的代码:

1
2
3
x = layers.AveragePooling2D((2, 2), (2, 2))(x)
x = layers.Flatten()(x)
output = layers.Dense(1, 'sigmoid')(x)

残差块实现

1
2
3
4
5
6
7
8
9
10
11
12
def identity_block_2(x, f, use_shortcut=True):
_, _, _, C = x.shape
x_shortcut = x
x = layers.Conv2D(C, f, padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.ReLU()(x)
x = layers.Conv2D(C, f, padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
if use_shortcut:
x = x + x_shortcut
x = layers.ReLU()(x)
return x

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def convolution_block_2(x, f, filters, s: int, use_shortcut=True):
x_shortcut = x
x = layers.Conv2D(filters, f, strides=(s, s), padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters, f, padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
if use_shortcut:
x_shortcut = layers.Conv2D(filters, 1, strides=(s, s),
padding='valid')(x_shortcut)
x_shortcut = layers.BatchNormalization(axis=3)(x_shortcut)
x = x + x_shortcut
x = layers.ReLU()(x)
return x

1
2
3
4
5
6
7
8
9
10
11
12
13
def identity_block_3(x, f, filters1, filters2, use_shortcut=True):
x_shortcut = x
x = layers.Conv2D(filters1, 1, padding='valid')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.Conv2D(filters1, f, padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters2, 1, padding='valid')(x)
x = layers.BatchNormalization(axis=3)(x)
if use_shortcut:
x = x + x_shortcut
x = layers.ReLU()(x)
return x

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def convolution_block_3(x, f, filters1, filters2, s: int, use_shortcut=True):
x_shortcut = x
x = layers.Conv2D(filters1, 1, strides=(s, s), padding='valid')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.Conv2D(filters1, f, padding='same')(x)
x = layers.BatchNormalization(axis=3)(x)
x = layers.ReLU()(x)
x = layers.Conv2D(filters2, 1, padding='valid')(x)
x = layers.BatchNormalization(axis=3)(x)
if use_shortcut:
x_shortcut = layers.Conv2D(filters2, 1, strides=(s, s),
padding='same')(x_shortcut)
x_shortcut = layers.BatchNormalization(axis=3)(x_shortcut)
x = x + x_shortcut
x = layers.ReLU()(x)
return x

这些代码中有一个细节要注意:在convolution_block_3中,stride=2是放在第一个还是第二个卷积层中没有定论。不同框架似乎对此有不同的实现方式。这里是把它放到了第一个1x1卷积里。

实验结果

在这个项目中,我已经准备好了数据集预处理的代码。可以轻松地生成数据集并用TensorFlow训练模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def main():
train_X, train_Y, test_X, test_Y = get_cat_set(
'dldemos/LogisticRegression/data/archive/dataset',
train_size=500,
test_size=50)
print(train_X.shape) # (m, 224, 224, 3)
print(train_Y.shape) # (m , 1)

#model = init_model()
#model = init_model(use_shortcut=False)
model = init_model(model_name='ResNet50')
# model = init_model(model_name='ResNet50', use_shortcut=False)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy'])

model.fit(train_X, train_Y, epochs=20, batch_size=16)
model.evaluate(test_X, test_Y)

为了让训练尽快结束,我只训了20个epoch,且使用的数据集比较小。我在ResNet-18中使用了3000个训练样本,ResNet-50中使用了1000个训练样本。数据的多少不影响对比结果,我们只需要知道模型的训练误差,便足以比较这四个模型了。

以下是我在四个实验中得到的结果。

ResNet-18

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Epoch 1/20
63/63 [==============================] - 75s 1s/step - loss: 1.9463 - accuracy: 0.5485
Epoch 2/20
63/63 [==============================] - 71s 1s/step - loss: 0.9758 - accuracy: 0.5423
Epoch 3/20
63/63 [==============================] - 81s 1s/step - loss: 0.8490 - accuracy: 0.5941
Epoch 4/20
63/63 [==============================] - 73s 1s/step - loss: 0.8309 - accuracy: 0.6188
Epoch 5/20
63/63 [==============================] - 72s 1s/step - loss: 0.7375 - accuracy: 0.6402
Epoch 6/20
63/63 [==============================] - 77s 1s/step - loss: 0.7932 - accuracy: 0.6769
Epoch 7/20
63/63 [==============================] - 78s 1s/step - loss: 0.7782 - accuracy: 0.6713
Epoch 8/20
63/63 [==============================] - 76s 1s/step - loss: 0.6272 - accuracy: 0.7147
Epoch 9/20
63/63 [==============================] - 77s 1s/step - loss: 0.6303 - accuracy: 0.7059
Epoch 10/20
63/63 [==============================] - 74s 1s/step - loss: 0.6250 - accuracy: 0.7108
Epoch 11/20
63/63 [==============================] - 73s 1s/step - loss: 0.6065 - accuracy: 0.7142
Epoch 12/20
63/63 [==============================] - 74s 1s/step - loss: 0.5289 - accuracy: 0.7754
Epoch 13/20
63/63 [==============================] - 73s 1s/step - loss: 0.5005 - accuracy: 0.7506
Epoch 14/20
63/63 [==============================] - 73s 1s/step - loss: 0.3961 - accuracy: 0.8141
Epoch 15/20
63/63 [==============================] - 74s 1s/step - loss: 0.4417 - accuracy: 0.8121
Epoch 16/20
63/63 [==============================] - 74s 1s/step - loss: 0.3761 - accuracy: 0.8136
Epoch 17/20
63/63 [==============================] - 73s 1s/step - loss: 0.2764 - accuracy: 0.8809
Epoch 18/20
63/63 [==============================] - 71s 1s/step - loss: 0.2698 - accuracy: 0.8878
Epoch 19/20
63/63 [==============================] - 72s 1s/step - loss: 0.1483 - accuracy: 0.9457
Epoch 20/20
63/63 [==============================] - 72s 1s/step - loss: 0.2495 - accuracy: 0.9079

ResNet-18 无跳连

text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Epoch 1/20
63/63 [==============================] - 63s 963ms/step - loss: 1.4874 - accuracy: 0.5111
Epoch 2/20
63/63 [==============================] - 62s 990ms/step - loss: 0.7654 - accuracy: 0.5386
Epoch 3/20
63/63 [==============================] - 65s 1s/step - loss: 0.6799 - accuracy: 0.6210
Epoch 4/20
63/63 [==============================] - 62s 990ms/step - loss: 0.6891 - accuracy: 0.6086
Epoch 5/20
63/63 [==============================] - 65s 1s/step - loss: 0.7921 - accuracy: 0.5182
Epoch 6/20
63/63 [==============================] - 65s 1s/step - loss: 0.7123 - accuracy: 0.5643
Epoch 7/20
63/63 [==============================] - 64s 1s/step - loss: 0.7071 - accuracy: 0.5173
Epoch 8/20
63/63 [==============================] - 64s 1s/step - loss: 0.6653 - accuracy: 0.6227
Epoch 9/20
63/63 [==============================] - 65s 1s/step - loss: 0.6675 - accuracy: 0.6249
Epoch 10/20
63/63 [==============================] - 64s 1s/step - loss: 0.6959 - accuracy: 0.6130
Epoch 11/20
63/63 [==============================] - 66s 1s/step - loss: 0.6730 - accuracy: 0.6182
Epoch 12/20
63/63 [==============================] - 63s 1s/step - loss: 0.6321 - accuracy: 0.6491
Epoch 13/20
63/63 [==============================] - 63s 992ms/step - loss: 0.6413 - accuracy: 0.6569
Epoch 14/20
63/63 [==============================] - 63s 1s/step - loss: 0.6130 - accuracy: 0.6885
Epoch 15/20
63/63 [==============================] - 62s 988ms/step - loss: 0.6750 - accuracy: 0.6056
Epoch 16/20
63/63 [==============================] - 66s 1s/step - loss: 0.6341 - accuracy: 0.6526
Epoch 17/20
63/63 [==============================] - 68s 1s/step - loss: 0.6384 - accuracy: 0.6676
Epoch 18/20
63/63 [==============================] - 65s 1s/step - loss: 0.5750 - accuracy: 0.6997
Epoch 19/20
63/63 [==============================] - 63s 997ms/step - loss: 0.5932 - accuracy: 0.7094
Epoch 20/20
63/63 [==============================] - 62s 990ms/step - loss: 0.6133 - accuracy: 0.6420

ResNet-50

text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Epoch 1/20
63/63 [==============================] - 72s 1s/step - loss: 3.4660 - accuracy: 0.4970
Epoch 2/20
63/63 [==============================] - 67s 1s/step - loss: 1.3429 - accuracy: 0.5686
Epoch 3/20
63/63 [==============================] - 68s 1s/step - loss: 1.0294 - accuracy: 0.5616
Epoch 4/20
63/63 [==============================] - 68s 1s/step - loss: 0.7920 - accuracy: 0.6186
Epoch 5/20
63/63 [==============================] - 70s 1s/step - loss: 0.6698 - accuracy: 0.6773
Epoch 6/20
63/63 [==============================] - 70s 1s/step - loss: 0.6884 - accuracy: 0.7289
Epoch 7/20
63/63 [==============================] - 70s 1s/step - loss: 0.7144 - accuracy: 0.6399
Epoch 8/20
63/63 [==============================] - 69s 1s/step - loss: 0.7088 - accuracy: 0.6698
Epoch 9/20
63/63 [==============================] - 68s 1s/step - loss: 0.6385 - accuracy: 0.6446
Epoch 10/20
63/63 [==============================] - 69s 1s/step - loss: 0.5389 - accuracy: 0.7417
Epoch 11/20
63/63 [==============================] - 71s 1s/step - loss: 0.4954 - accuracy: 0.7832
Epoch 12/20
63/63 [==============================] - 73s 1s/step - loss: 0.4489 - accuracy: 0.7782
Epoch 13/20
63/63 [==============================] - 69s 1s/step - loss: 0.3987 - accuracy: 0.8257
Epoch 14/20
63/63 [==============================] - 72s 1s/step - loss: 0.3228 - accuracy: 0.8519
Epoch 15/20
63/63 [==============================] - 70s 1s/step - loss: 0.2089 - accuracy: 0.9235
Epoch 16/20
63/63 [==============================] - 69s 1s/step - loss: 0.4766 - accuracy: 0.7756
Epoch 17/20
63/63 [==============================] - 75s 1s/step - loss: 0.2148 - accuracy: 0.9181
Epoch 18/20
63/63 [==============================] - 70s 1s/step - loss: 0.3086 - accuracy: 0.8623
Epoch 19/20
63/63 [==============================] - 69s 1s/step - loss: 0.3544 - accuracy: 0.8732
Epoch 20/20
63/63 [==============================] - 70s 1s/step - loss: 0.0796 - accuracy: 0.9704

ResNet-50 无跳连

text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Epoch 1/20
63/63 [==============================] - 60s 882ms/step - loss: 1.2093 - accuracy: 0.5034
Epoch 2/20
63/63 [==============================] - 56s 892ms/step - loss: 0.8433 - accuracy: 0.4861
Epoch 3/20
63/63 [==============================] - 59s 931ms/step - loss: 0.7512 - accuracy: 0.5235
Epoch 4/20
63/63 [==============================] - 62s 991ms/step - loss: 0.7395 - accuracy: 0.4887
Epoch 5/20
63/63 [==============================] - 62s 990ms/step - loss: 0.7770 - accuracy: 0.5316
Epoch 6/20
63/63 [==============================] - 60s 945ms/step - loss: 0.7408 - accuracy: 0.4947
Epoch 7/20
63/63 [==============================] - 67s 1s/step - loss: 0.7345 - accuracy: 0.5434
Epoch 8/20
63/63 [==============================] - 62s 984ms/step - loss: 0.7214 - accuracy: 0.5605
Epoch 9/20
63/63 [==============================] - 60s 950ms/step - loss: 0.7770 - accuracy: 0.4784
Epoch 10/20
63/63 [==============================] - 60s 956ms/step - loss: 0.7171 - accuracy: 0.5203
Epoch 11/20
63/63 [==============================] - 63s 994ms/step - loss: 0.7045 - accuracy: 0.4921
Epoch 12/20
63/63 [==============================] - 63s 1s/step - loss: 0.6884 - accuracy: 0.5430
Epoch 13/20
63/63 [==============================] - 60s 958ms/step - loss: 0.7333 - accuracy: 0.5278
Epoch 14/20
63/63 [==============================] - 61s 966ms/step - loss: 0.7050 - accuracy: 0.5106
Epoch 15/20
63/63 [==============================] - 59s 943ms/step - loss: 0.6958 - accuracy: 0.5622
Epoch 16/20
63/63 [==============================] - 60s 954ms/step - loss: 0.7398 - accuracy: 0.5172
Epoch 17/20
63/63 [==============================] - 69s 1s/step - loss: 0.7104 - accuracy: 0.5023
Epoch 18/20
63/63 [==============================] - 74s 1s/step - loss: 0.7411 - accuracy: 0.4747
Epoch 19/20
63/63 [==============================] - 67s 1s/step - loss: 0.7056 - accuracy: 0.4706
Epoch 20/20
63/63 [==============================] - 81s 1s/step - loss: 0.7901 - accuracy: 0.4898

对比ResNet-18和ResNet-50,可以看出,ResNet-50的拟合能力确实更强一些。

对比无跳连的ResNet-18和ResNet-50,可以看出,ResNet-50的拟合能力反而逊于ResNet-18。这符合ResNet的初衷,如果不加残差连接的话,过深的网络反而会因为梯度问题而有更高的训练误差。

此外,不同模型的训练速度也值得一讲。在训练数据量减少到原来的1/3后,ResNet-50和ResNet-18的训练速度差不多。ResNet-50看上去比ResNet-18多了很多层,网络中间也使用了通道数很大的卷积,但整体的参数量并没有增大多少,这多亏了能降低运算量的瓶颈结构。

总结

在这篇文章中,我展示了ResNet-18和ResNet-50的TensorFlow实现。这份代码包括了经典ResNet中两种残差块的两种实现,完整地复现了原论文的模型模块。同时,经实验分析,我验证了ResNet残差连接的有效性。

未来我还会写一篇ResNet的PyTorch实现,并附上论文的详细解读。

学习提示

上周,我们学完了CNN的基础组成模块。而从这周开始,我们要换一种学习方式:我们会认识一些经典的CNN架构,从示例中学习。一方面来说,通过了解他人的网络,阅读他人的代码,我们能够更快地掌握如何整合CNN的基础模块;另一方面,CNN架构往往泛化能力较强,学会了其他任务中成熟的架构,可以把这些架构直接用到我们自己的任务中。

接下来,我们会按照CNN的发展历史,认识许多CNN架构。首先是经典网络:

  • LeNet-5
  • AlexNet
  • VGG

之后是近年来的一些网络:

  • ResNet
  • Inception
  • MobileNet

我们不会把这些研究的论文详细过一遍,而只会学习各研究中最精华的部分。学有余力的话,最好能在课后把论文自己过一遍。

课堂笔记

经典网络

LeNet-5

LeNet-5是用于手写数字识别(识别0~9的阿拉伯数字)的网络。它的结构如下:

网络是输入是一张[32, 32, 1]的灰度图像,输入经过4个卷积+池化层,再经过两个全连接层,输出一个0~9的数字。这个网络和我们上周见过的网络十分相似,数据体的宽和高在不断变小,而通道数在不断变多。

这篇工作是1998年发表的,当时的神经网络架构和现在我们学的有不少区别:

  • 当时padding还没有得到广泛使用,数据体的分辨率会越降越小。
  • 当时主要使用平均池化,而现在最大池化更常见。
  • 网络只输出一个值,表示识别出来的数字。而现在的多分类任务一般会输出10个值并使用softmax激活函数。
  • 当时激活函数只用sigmoid和tanh,没有人用ReLU。
  • 当时的算力没有现在这么强,原工作在计算每个通道卷积时使用了很多复杂的小技巧。而现在我们直接算就行了。

LeNet-5只有6万个参数。随着算力的增长,后来的网络越来越大了。

AlexNet

AlexNet是2012年发表的有关图像分类的CNN结构。它的输入是[227, 227, 3]的图像,输出是一个1000类的分类结果。

原论文里写的是输入形状是[224, 224, 3],但实际上这个分辨率是有问题的,按照这个分辨率是算不出后续结果的分辨率的。但现在一些框架对AlexNet的复现中,还是会令输入的分辨率是224。这是因为框架在第一层卷积中加了一个padding的操作,强行让后续数据的分辨率和原论文对上了。

AlexNet和LeNet-5在架构上十分接近。但是,AlexNet做出了以下改进:

  • AlexNet用了更多的参数,一共有约6000万个参数。
  • 使用ReLU作为激活函数。

AlexNet还提出了其他一些创新,但与我们要学的知识没有那么多关系:

  • 当时算力还是比较紧张,AlexNet用了双GPU训练。论文里写了很多相关的工程细节。
  • 使用了Local Response Normalization这种归一化层。现在几乎没人用这种归一化。

AlexNet中的一些技术在今天看来,已经是常识般的存在。而在那个年代,尽管深度学习在语音识别等任务上已经初露锋芒,人们还没有开始重视深度学习这项技术。正是由于AlexNet这一篇工作的出现,计算机视觉的研究者开始关注起了深度学习。甚至在后来,这篇工作的影响力已经远超出了计算机视觉社区。

VGG-16

VGG-16也是一个图像分类网络。VGG的出发点是:为了简化网络结构,只用3x3等长(same)卷积和2x2最大池化。

可以看出,VGG也是经过了一系列的卷积和池化层,最后使用全连接层和softmax输出结果。顺带一提,VGG-16里的16表示有16个带参数的层。

VGG非常庞大,有138M(1.38亿)个参数。但是它简洁的结构吸引了很多人的关注。

吴恩达老师鼓励大家去读一读这三篇论文。可以先看AlexNet,再看VGG。LeNet有点难读,可以放到最后去读。

ResNets(基于残差的网络)

非常非常深的神经网络是很难训练的,这主要是由梯度爆炸/弥散问题导致的。在这一节中,我们要学一种叫做“跳连(skip connection)”的网络模块连接方式。使用跳连,我们能让浅层模块的输出直接对接到深层模块的输入上,进而搭建基于残差的网络,解决梯度爆炸/弥散问题,训练深达100层的网络。

残差块

回忆一下,在全连接网络中,假如我们有中间层的输出$a^{[l]}, a^{[l+2]}$,$a^{[l+2]}$是怎么由$a^{[l]}$算出来的呢?我们之前用的公式如下:

也就是说,$a^{[l]}$要经过一个线性层、一个激活函数、一个线性层、一个激活函数,才能传递到$a^{[l+2]}$,这条路径非常长:

而在残差块(Residual block)中,我们使用了一种新的连接方法:

$a^{[l]}$的值被直接加到了第二个ReLU层之前的线性输出上,这是一种类似电路中短路的连接方法(又称跳连)。这样,浅层的信息能更好地传到深层了。

使用这种方法后,计算公式变更为:

残差块中还有一个要注意的细节。$a^{[l+2]}=g(z^{[l+2]}+a^{[l]})$这个式子能够成立,实际上是默认了$a^{[l+2]}, a^{[l]}$的维度相同。而一旦$a^{[l+2]}$的维度发生了变化,就需要用下面这种方式来调整了。

$a^{[l+2]}=g(z^{[l+2]}+W’a^{[l]})$

我们可以用一个$W’$来完成维度的转换。为了方便理解,我们先让所有$a$都是一维向量,$W’$是矩阵。这样,假设$a^{[l+2]}$的长度是256,$a^{[l]}$的长度是128,则$W’$的形状就是$256 \times 128$。

但实际上,$a$是一个三维的图像张量,三个维度的长度都可能发生变化。因此,对于图像,上式中的$W’$应该表示的是一个卷积操作。通过卷积操作,我们能够减小图像的宽高,调整图像的通道数,使得$a^{[l]}$和$a^{[l+2]}$的维度完全相同。

残差网络

在构建残差网络ResNet时,只要把这种残差块一个一个拼接起来即可。或者从另一个角度来看,对于一个“平坦网络”(”plain network”, ResNet论文中用的词,用于表示非残差网络),我们只要把线性层两两打包,添加跳连即可。

残差块起到了什么作用呢?让我们看看在网络层数变多时,平坦网络和残差网络训练误差的变化趋势:

理论上来说,层数越深,训练误差应该越低。但在实际中,对平坦网络增加深度,反而会让误差变高。而使用ResNet后,随着深度增加,训练误差起码不会降低了。

正是有这样的特性,我们可以用ResNet架构去训练非常深的网络。

为什么ResNet是有这样的特性呢?我们还是从刚刚那个ResNet的公式里找答案。

假设我们设计好了一个网络,又给它新加了一个残差块,即多加了两个卷积层,那么最后的输出可以写成:

由于正则化的存在,所有$W$和$b$都倾向于变得更小。极端情况下,$W, b$都变为0了。那么,

再不妨设$g=ReLU$。则因为$a^{[l]}$也是ReLU的输出,有

这其实是一个恒等映射,也就是说,新加的残差块对之前的输出没有任何影响。网络非常容易学习到恒等映射。这样,最起码能够保证较深的网络不比浅的网络差。

准备好了所有基础知识,我们来看看完整的ResNet长什么样。

ResNet有几个参数量不同的版本。这里展示的叫做ResNet-34。完整的网络很长,我们只用关注其中一小部分就行了。

一开始,网络还是用一个大卷积核大步幅的卷积以及一个池化操作快速降低图像的宽度,再把数据传入残差块中。和我们刚刚学的一样,残差块有两种,一种是维度相同可以直接相加的(实线),一种是要调整维度的(虚线)。整个网络就是由这若干个这样的残差块组构成。经过所有残差块后,还是和经典的网络一样,用全连接层输出结果。

这里,我们只学习了残差连接的基本原理。ResNet的论文里还有更多有关网络结构、实验的细节。最好能读一读论文。当然,这周的编程实战里我们也会复现ResNet,以加深对其的理解。

Inception 网络

我们已经见过不少CNN的示例了。当我们仿照它们设计自己的网络时,或许会感到迷茫:有3x3, 5x5卷积,有池化,该怎么选择每一个模块呢?Inception网络给了一个解决此问题的答案:我全都要。

Inception网络用到了一种特殊的1x1卷积。我们会先学习1x1卷积,再学习Inception网络的知识。

1x1卷积

用1x1的卷积核去卷一幅图像,似乎是一件很滑稽的事情。假设一幅图像的数字是[1, 2, 3],卷积核是[2],那么卷出来的图像就是[2, 4, 6]。这不就是把每个数都做了一次乘法吗?

对于通道数为1的图像,1x1卷积确实没什么大用。而当通道数多起来后,1x1卷积的意义就逐渐显现出来了。思考一下,对多通道的图像做1x1卷积,就是把某像素所有通道的数字各乘一个数,求和,加一个bias,再通过激活函数。这是计算一个输出结果的过程,而如果有多个卷积核,就可以计算出多个结果。(下图中,蓝色的数据体是输入图像,黄色的数据体是1x1的卷积核。两个数据体重合部分的数据会先做乘法,再求和,加bias,经过激活函数。)

这个过程让你想起了什么?没错,正是最早学习的全连接网络。1x1卷积,实际上就是在各通道上做了一次全连接的计算。1x1卷积的输入通道数,就是全连接层上一层神经元的数量;1x1卷积核的数量,就是这一层神经元的数量。

1x1卷积主要用于变换图像的通道数。比如要把一个192通道数的图像变成32通道的,就应该用32个1x1卷积去卷原图像。

Inception块的原理

在Inception网络中,我们会使用这样一种混合模块:对原数据做1x1, 3x3, 5x5卷积以及最大池化,得到通道数不同的数据体。这些数据体会被拼接起来,作为整个模块的输出。

值得注意的是,这里的池化操作和我们之前见过的不太一样。为了保持输出张量的宽高,这个池化的步幅为1,且使用了等长填充。另外,为了调整池化操作输出的通道数,这条数据处理路线上还有一个用1x1卷积变换通道数的操作。这份图省略了很多这样的细节,下一节我们会见到这幅图的完整版。

在实现这样一种模块时,会碰到计算量过大的问题。比如把上面$28 \times 28 \times 192$的数据体用$5 \times 5$卷积卷成$28 \times 28 \times 32$的数据体,需要多少次乘法计算呢?对每个像素单独考虑,一个通道上的卷积要做$5 \times 5$此乘法,192个通道的卷积要做$192 \times 5 \times 5$次乘法。32个这样的卷积在$28 \times 28$的图片上要做$28 \times 28 \times 32 \times 192 \times 5 \times 5 \approx 120M$次乘法。这个计算量太大了。

为此,我们可以巧妙地先利用1x1卷积减少通道数,再做5x5卷积。这样,计算量就少得多了。

这样一种两头大,中间小的结构被形象地称为瓶颈(bottlenect)。这种结构被广泛用在许多典型网络中。

Inception网络

有了之前的知识,我们可以看Inception模块的完整结构了。1x1卷积没有什么特别的。为了减少3x3卷积和5x5卷积的计算量,做这两种卷积之前都会用1x1卷积减少通道数。而为了改变池化结果的通道数,池化后接了一个1x1卷积操作。

实际上,理解了Inception块,也就能看懂Inception网络了。如下图所示,红框内的模块都是Inception块。而这个网络还有一些小细节:除了和普通网络一样在网络的最后使用softmax输出结果外,这个网络还根据中间结果也输出了几个结果。当然,这些都是早期网络的设计技巧了。

MobileNet

MobileNet,顾名思义,这是一种适用于移动(mobile)设备的神经网络。移动设备的计算资源通常十分紧缺,因此,MobileNet对网络的计算量进行了极致的压缩。

减少卷积运算量

再回顾一遍,一次卷积操作中主要的计算量如下:

计算量这么大,主要问题出在每一个输出通道都要与每一个输入通道“全连接”上。为此,我们可以考虑让输出通道只由部分的输入通道决定。这样一种卷积的策略叫逐深度可分卷积(Depthwise Separable Convolution)。

这里的depthwise是“逐深度”的意思,但我觉得“逐通道”这个称呼会更容易理解一点。

逐深度可分卷积分为两步:逐深度卷积(depthwise convolution),逐点卷积(pointwise convolution)。逐深度卷积生成新的通道,逐点卷积把各通道的信息关联起来。

之前,要对下图中的三通道图片做卷积,需要3个卷积核分别处理3个通道。而在逐深度卷积中,我们只要1个卷积核。这个卷积核会把输入图像当成三个单通道图像来看待,分别对原图像的各个通道进行卷积,并生成3个单通道图像,最后把3个单通道图像拼回一个三通道图像。也就是说,逐深度卷积只能生成一幅通道数相同的新图像。

逐深度卷积可以通过设置卷积在编程框架中的groups参数来实现。参见我讲解卷积的文章

下一步,是逐点卷积,也就是1x1卷积。它用来改变图片的通道数。

之前的卷积有2160次乘法,现在只有432+240=672次,计算量确实减少了不少。实际上,优化后计算量占原计算量的比例是:

其中$n_c’$是输出通道数,$f$是卷积核边长。一般来说计算量都会少10倍。

网络结构

知道了MobileNet的基本思想,我们来看几个不同版本的MobileNet。

MobileNet v1

13个逐深度可分卷积模块,之后接通常的池化、全连接、softmax。

MobileNet v2

两个改进:

  1. 残差连接
  2. 扩张(expansion)操作

残差连接和ResNet一样。这里我们关注一下第二个改进。

在MobileNet v2中,先做一个扩张维度的1x1卷积,再做逐深度卷积,最后做之前的逐点1x1卷积。由于最后的逐点卷积起到的是减小维度的作用,所以最后一步操作也叫做投影。

这种架构很好地解决了性能和效果之间的矛盾:在模块之间,数据的通道数只有3,占用内存少;在模块之内,更高通道的数据能拟合更复杂的函数。

EfficientNet

EfficientNet能根据设备的计算能力,自动调整网络占用的资源。

让我们想想,哪些因素决定了一个网络占用的运算资源?我们很快能想到下面这些因素:

  • 图像分辨率
  • 网络深度
  • 特征的长度(即卷积核数量或神经元数量)

在EfficientNet中,我们可以在这三个维度上缩放网络,动态改变网络的计算量。EfficientNet的开源实现中,一般会提供各设备下的最优参数。

卷积网络实现细节

使用开源实现

由于深度学习项目涉及很多训练上的细节,想复现一个前人的工作是很耗时的。最好的学习方法是找到别人的开源代码,在现有代码的基础上学习。

深度学习的开源代码一般在GitHub上都能找到。如果是想看PyTorch实现,可以直接去GitHub上搜索OpenMMLab。

使用迁移学习

如第三门课第二周所学,我们可以用迁移学习,导入别人训练好的模型里的权重为初始权重,加速我们自己模型的训练。

还是以多分类任务的迁移学习为例(比如把一个1000分类的分类器迁移到一个猫、狗、其他的三分类模型上)。迁移后,新的网络至少要删除输出层,并按照新的多分类个数,重新初始化一个输出层。之后,根据新任务的数据集大小,冻结网络的部分参数,从导入的权重开始重新训练网络的其他部分:

当然,可以多删除几个较深的层,也可以多加入几个除了输出层以外的隐藏层。

数据增强

由于CV任务总是缺少数据,数据增强是一种常见的提升网络性能的手段。

常见的改变形状的数据增强手段有:

  • 镜像
  • 裁剪
  • 旋转
  • 扭曲

此外,还可以改变图像的颜色。比如对三个颜色通道都随机加一个偏移量。

数据增强有一些实现上的细节:数据的读取及增强是放在CPU上运行的,训练是放在CPU或GPU上运行的。这两步其实是独立的,可以并行完成。最常见的做法是,在CPU上用多进程(发挥多核的优势)读取数据并进行数据增强,之后把数据搬到GPU上训练。

计算机视觉的现状与相关建议

一般来说,算法从两个来源获取知识:标注的数据,人工设计的特征。这二者是互补的关系。对于人工智能任务来说,如果有足够的数据,设计一个很简单的网络就行了;而如果数据量不足,则需要去精心设计网络结构。

像语音识别这种任务就数据充足,用简单的网络就行了。而大部分计算机视觉任务都处于数据不足的状态。哪怕计算机视觉中比较基础的图像分类任务,都需要设计结构复杂的网络,更不用说目标检测这些更难的任务了。

如果你想用深度学习模型参加刷精度的比赛,可以使用以下几个小技巧:

  • 同时开始训练多个网络,算结果时取它们的平均值。
  • 对图像分类任务,可以把图像随机裁剪一部分并输入网络,多次执行这一步骤并取平均分类结果。

也就是说,只是为了提高精度的话,可以想办法对同一份输入执行多次条件不同的推理,并对结果求平均。当然,实际应用中是不可能用性能这么低的方法。

总结

这节课是CNN中最重要的一节课。通过学习一些经典的CNN架构,我们掌握了很多有关搭建CNN的知识。总结一下:

  • 早期CNN
    • 卷积、池化、全连接
    • 边长减小,通道数增加
  • ResNet
    • 为什么使用ResNet?
    • 梯度问题是怎么被解决的?
    • 残差块的一般结构
    • 输入输出通道数不同的残差块
    • 了解ResNet的结构(ResNet-18, ResNet-50)
  • Incpetion 网络
    • 1x1卷积
    • 用1x1卷积减少计算量
    • Inception网络的基本模块
  • MobileNet
    • 逐深度可分卷积
    • MobileNet v2中的瓶颈结构

这节课介绍的都是比较前沿的CNN架构。在深度学习技术日新月异的今天,最好的学习方式是读论文,尽快一步一步跟上最新的技术。这堂课中提及的比较新的几篇论文,都有很高的阅读价值。

我打算在学完CNN的四周课后,暂时不去学第五门课,而是去阅读这些经典CNN论文并分享一下笔记。

在这周的代码实战里,我会分享一下如何用TensorFlow和PyTorch编写ResNet,同时介绍两种框架的进阶用法。

在之前的文章中,我介绍了如何用NumPy实现卷积正向传播
在这篇文章里,我会继续介绍如何用NumPy复现二维卷积的反向传播,并用PyTorch来验证结果的正确性。通过阅读这篇文章,大家不仅能进一步理解卷积的实现原理,更能领悟到一般算子的反向传播实现是怎么推导、编写出来的。

项目网址:https://github.com/SingleZombie/DL-Demos/tree/master/dldemos/BasicCNN

本文代码在dldemos/BasicCNN/np_conv_backward.py这个文件里。

实现思路

回忆一下,在正向传播中,我们是这样做卷积运算的:

1
2
3
4
5
6
7
8
9
10
11
for i_h in range(h_o):
for i_w in range(w_o):
for i_c in range(c_o):
h_lower = i_h * stride
h_upper = i_h * stride + f
w_lower = i_w * stride
w_upper = i_w * stride + f
input_slice = input_pad[h_lower:h_upper, w_lower:w_upper, :]
kernel_slice = weight[i_c]
output[i_h, i_w, i_c] = np.sum(input_slice * kernel_slice)
output[i_h, i_w, i_c] += bias[i_c]

我们遍历输出图像的每一个位置,选择该位置对应的输入图像切片和卷积核,做一遍乘法,再加上bias。

其实,一轮运算写成数学公式的话,就是一个线性函数y=wx+b。对w, x, b求导非常简单:

1
2
3
dw_i = x * dy
dx_i = w * dy
db_i = dy

在反向传播中,我们只需要遍历所有这样的线性运算,计算这轮运算对各参数的导数的贡献即可。最后,累加所有的贡献,就能得到各参数的导数。当然,在用代码实现这段逻辑时,可以不用最后再把所有贡献加起来,而是一算出来就加上。

1
2
3
dw += x * dy
dx += w * dy
db += dy

这里要稍微补充一点。在前向传播的实现中,我加入了dilation, groups这两个参数。为了简化反向传播的实现代码,只展示反向传播中最精华的部分,我在这份卷积实现中没有使用这两个参数。

代码实现

在开始实现反向传播之前,我们先思考一个问题:反向传播的函数应该有哪些参数?从数学上来讲,反向传播和正向传播的参数是相反的。设正向传播的输入是A_prev, W, b(输入图像、卷积核组、偏差),则应该输出Z(输出图像)。那么,在反向传播中,应该输入dZ,输出dA_prev, dW, db。可是,在写代码时,我们还需要一些其他的输入参数。

我的反向传播函数的函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def conv2d_backward(dZ: np.ndarray, cache: Dict[str, np.ndarray], stride: int,
padding: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""2D Convolution Backward Implemented with NumPy

Args:
dZ: (np.ndarray): The derivative of the output of conv.
cache (Dict[str, np.ndarray]): Record output 'Z', weight 'W', bias 'b'
and input 'A_prev' of forward function.
stride (int): Stride for convolution.
padding (int): The count of zeros to pad on both sides.

Outputs:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The derivative of W, b,
A_prev.
"""

虽然我这里把所有参数都写在了一起,但从逻辑上来看,这些参数应该分成三个类别。在编程框架中,这三类参数会储存在不同的地方。

  • dZ: 反向传播函数真正的输入。
  • cache: 正向传播中的一些中间变量Z, W, b。由于我们必须在一个独立的函数里完成反向传播,这些中间变量得以输入参数的形式供函数访问。
  • stride, padding: 这两个参数是卷积的属性。如果卷积层是用一个类表示的话,这些参数应该放在类属性里,而不应该放在反向传播的输入里。

给定这三类参数,就足以完成反向传播计算了。下面我来介绍conv2d_backward的具体实现。

首先,获取cache中的参数,并且新建储存梯度的张量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
W = cache['W']
b = cache['b']
A_prev = cache['A_prev']
dW = np.zeros(W.shape)
db = np.zeros(b.shape)
dA_prev = np.zeros(A_prev.shape)

_, _, c_i = A_prev.shape
c_o, f, f_2, c_k = W.shape
h_o, w_o, c_o_2 = dZ.shape

assert (f == f_2)
assert (c_i == c_k)
assert (c_o == c_o_2)

之后,为了实现填充操作,我们要把A_prevdA_prev都填充一下。注意,算完了所有梯度后,别忘了要重新把dA_prevdA_prev_pad里抠出来。

1
2
3
4
A_prev_pad = np.pad(A_prev, [(padding, padding), (padding, padding),
(0, 0)])
dA_prev_pad = np.pad(dA_prev, [(padding, padding), (padding, padding),
(0, 0)])

接下来,就是梯度的计算了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for i_h in range(h_o):
for i_w in range(w_o):
for i_c in range(c_o):
h_lower = i_h * stride
h_upper = i_h * stride + f
w_lower = i_w * stride
w_upper = i_w * stride + f

input_slice = A_prev_pad[h_lower:h_upper, w_lower:w_upper, :]
# forward
# kernel_slice = W[i_c]
# Z[i_h, i_w, i_c] = np.sum(input_slice * kernel_slice)
# Z[i_h, i_w, i_c] += b[i_c]

# backward
dW[i_c] += input_slice * dZ[i_h, i_w, i_c]
dA_prev_pad[h_lower:h_upper,
w_lower:w_upper, :] += W[i_c] * dZ[i_h, i_w, i_c]
db[i_c] += dZ[i_h, i_w, i_c]

在算导数时,我们应该对照着正向传播的计算,算出每一条计算对导数的贡献。如前文所述,卷积操作只是一个简单的y=wx+b,把对应的w, x, b从变量里正确地取出来并做运算即可。

最后,要把这些导数返回。别忘了把填充后的dA_prev恢复一下。

1
2
3
4
5
if padding > 0:
dA_prev = dA_prev_pad[padding:-padding, padding:-padding, :]
else:
dA_prev = dA_prev_pad
return dW, db, dA_prev

这里有一个细节:如果padding==0,则在取切片时范围会变成[0:-0],这样会取出一个长度为0的切片,而不是我们期望的原长度的切片。因此,要特判一下padding<=0的情况。

单元测试

为了方便地进行单元测试,我使用了pytest这个单元测试库。可以直接pip一键安装:

1
pip install pytest

之后就可以用pytest执行我的这份代码,代码里所有以test_开头的函数会被认为是单元测试的主函数。

1
pytest dldemos/BasicCNN/np_conv_backward.py

单元测试函数的定义如下:

1
2
3
4
5
@pytest.mark.parametrize('c_i, c_o', [(3, 6), (2, 2)])
@pytest.mark.parametrize('kernel_size', [3, 5])
@pytest.mark.parametrize('stride', [1, 2])
@pytest.mark.parametrize('padding', [0, 1])
def test_conv(c_i: int, c_o: int, kernel_size: int, stride: int, padding: str):

@pytest.mark.parametrize用于设置单元测试参数的可选值。我设置了4组参数,每组参数有2个可选值,经过排列组合后可以生成2^4=16个单元测试,pytest会自动帮我们执行不同的测试。

在单元测试中,我打算测试conv2d在各种输入通道数、输出通道数、卷积核大小、步幅、填充数的情况。

测试函数是这样写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def test_conv(c_i: int, c_o: int, kernel_size: int, stride: int, padding: str):

# Preprocess
input = np.random.randn(20, 20, c_i)
weight = np.random.randn(c_o, kernel_size, kernel_size, c_i)
bias = np.random.randn(c_o)

torch_input = torch.from_numpy(np.transpose(
input, (2, 0, 1))).unsqueeze(0).requires_grad_()
torch_weight = torch.from_numpy(np.transpose(
weight, (0, 3, 1, 2))).requires_grad_()
torch_bias = torch.from_numpy(bias).requires_grad_()

# forward
torch_output_tensor = torch.conv2d(torch_input, torch_weight, torch_bias,
stride, padding)
torch_output = np.transpose(
torch_output_tensor.detach().numpy().squeeze(0), (1, 2, 0))

cache = conv2d_forward(input, weight, bias, stride, padding)
numpy_output = cache['Z']

assert np.allclose(torch_output, numpy_output)

# backward
torch_sum = torch.sum(torch_output_tensor)
torch_sum.backward()
torch_dW = np.transpose(torch_weight.grad.numpy(), (0, 2, 3, 1))
torch_db = torch_bias.grad.numpy()
torch_dA_prev = np.transpose(torch_input.grad.numpy().squeeze(0),
(1, 2, 0))

dZ = np.ones(numpy_output.shape)
dW, db, dA_prev = conv2d_backward(dZ, cache, stride, padding)

assert np.allclose(dW, torch_dW)
assert np.allclose(db, torch_db)
assert np.allclose(dA_prev, torch_dA_prev)

整个测试函数可以分成三部分:变量预处理、前向传播、反向传播。在前向传播和反向传播中,我们要分别用刚编写的卷积核PyTorch中的卷积进行计算,并比较两个运算结果是否相同。

预处理时,我们要创建NumPy和PyTorch的输入。

1
2
3
4
5
6
7
8
9
10
# Preprocess
input = np.random.randn(20, 20, c_i)
weight = np.random.randn(c_o, kernel_size, kernel_size, c_i)
bias = np.random.randn(c_o)

torch_input = torch.from_numpy(np.transpose(
input, (2, 0, 1))).unsqueeze(0).requires_grad_()
torch_weight = torch.from_numpy(np.transpose(
weight, (0, 3, 1, 2))).requires_grad_()
torch_bias = torch.from_numpy(bias).requires_grad_()

之后是正向传播。计算结果和中间变量会被存入cache中。

1
2
3
4
5
6
7
8
9
10
# forward
torch_output_tensor = torch.conv2d(torch_input, torch_weight, torch_bias,
stride, padding)
torch_output = np.transpose(
torch_output_tensor.detach().numpy().squeeze(0), (1, 2, 0))

cache = conv2d_forward(input, weight, bias, stride, padding)
numpy_output = cache['Z']

assert np.allclose(torch_output, numpy_output)

最后是反向传播。在那之前,要补充说明一下如何在PyTorch里手动求一些数据的导数。在PyTorch中,各个张量默认是不可训练的。为了让框架知道我们想求哪几个参数的导数,我们要执行张量的required_grad_()方法,如:

1
2
torch_input = torch.from_numpy(np.transpose(
input, (2, 0, 1))).unsqueeze(0).requires_grad_()

这样,在正向传播时,PyTorch就会自动把对可训练参数的运算搭成计算图了。

正向传播后,对结果张量调用backward()即可执行反向传播。但是,PyTorch要求调用backward()的张量必须是一个标量,也就是它不能是矩阵,不能是任何长度大于1的数据。而这里PyTorch的卷积结果又是一个四维张量。因此,我把PyTorch卷积结果做了求和,得到了一个标量,用它来调用backward()

1
2
torch_sum = torch.sum(torch_output_tensor)
torch_sum.backward()

这样,就可以用tensor.grad获取tensor的导数了,如

1
2
3
torch_weight.grad
torch_bias.grad
torch_input.grad

整个反向传播测试的代码如下。

1
2
3
4
5
6
7
8
9
10
# backward
torch_sum = torch.sum(torch_output_tensor)
torch_sum.backward()
torch_dW = np.transpose(torch_weight.grad.numpy(), (0, 2, 3, 1))
torch_db = torch_bias.grad.numpy()
torch_dA_prev = np.transpose(torch_input.grad.numpy().squeeze(0),
(1, 2, 0))

dZ = np.ones(numpy_output.shape)
dW, db, dA_prev = conv2d_backward(dZ, cache, stride, padding)

再补充一下,在求导时,运算结果的导数是1。因此,新建dZ时,我用的是np.ones(全1张量)。同理,PyTorch也会默认运算结果的导数为1,即这里torch_sum.grad==1。而执行加法运算不会改变导数,所以torch_output_tensor.grad也是一个全是1的张量,和NumPy的dZ的值是一模一样的。

写完单元测试函数后,运行前面提到的单元测试命令,pytest就会输出很多测试的结果。

1
pytest dldemos/BasicCNN/np_conv_backward.py

如果看到了类似的输出,就说明我们的代码是正确的。

1
==== 16 passed in 1.04s ====

反向传播的编写思路

通过阅读上面的实现过程,相信大家已经明白如何编写卷积的反向传播了。接下来,我将总结一下实现一般算子的正向、反向传播的思路。无论是用NumPy,还是PyTorch等编程框架,甚至是纯C++,这种思路都是适用的。

一开始,我们要明白,一个算子总共会涉及到这些参数:

  • 输入与输出:算子的输入张量和输出张量。正向传播和反向传播的输入输出恰好是相反的。
  • 属性:算子的超参数。比如卷积的stride, padding
  • 中间变量:前向传播传递给反向传播的变量。

一般情况下,我们应该编写一个算子类。在初始化算子类时,算子的属性就以类属性的形式存储下来了。

在正向传播时,我们按照算子定义直接顺着写下去就行。这个时候,可以先准备好cache变量,但先不去管它,等写到反向传播的时候再处理。

接着,编写反向传播。由于反向传播和正向传播的运算步骤相似,我们可以直接把正向传播的代码复制一份。在这个基础上,思考每一步正向传播运算产生了哪些导数,对照着写出导数计算的代码即可。这时,我们会用到一些正向传播的中间结果,这下就可以去正向传播代码里填写cache,在反向传播里取出来了。

最后,写完了算子,一定要做单元测试。如果该算子有现成的实现,用现成的实现来对齐运算结果是最简单的一种实现单元测试的方式。

总结

在这篇文章中,我介绍了以下内容:

  • 卷积反向传播的NumPy实现
  • 如何用PyTorch手动求导
  • 如何编写完整的算子单元测试
  • 实现算子正向传播、反向传播的思路

如果你也想把代码基础打牢,一定一定要像这样自己动手从头写一份代码。在写代码,调bug的过程中,一定会有很多收获。

由于现在的编程框架都比较成熟,搞科研时基本不会碰到自己动手写底层算子的情况。但是,如果你想出了一个特别棒的idea,想出了一个全新的神经网络模块,却在写代码时碰到了阻碍,那可就太可惜了。学一学反向传播的实现还是很有用的。

在模型部署中,反向传播可能完全派不上用场。但是,一般框架在实现算子的正向传播时,是会照顾反向传播的。也就是说,如果抛掉反向传播,正向传播的实现或许可以写得更加高效。这样看来,了解反向传播的实现也是很有帮助的。我们可以用这些知识看懂别人的正向传播、反向传播的实现,进而优化代码的性能。

附录:完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from typing import Dict, Tuple

import numpy as np
import pytest
import torch


def conv2d_forward(input: np.ndarray, weight: np.ndarray, bias: np.ndarray,
stride: int, padding: int) -> Dict[str, np.ndarray]:
"""2D Convolution Forward Implemented with NumPy

Args:
input (np.ndarray): The input NumPy array of shape (H, W, C).
weight (np.ndarray): The weight NumPy array of shape
(C', F, F, C).
bias (np.ndarray | None): The bias NumPy array of shape (C').
Default: None.
stride (int): Stride for convolution.
padding (int): The count of zeros to pad on both sides.

Outputs:
Dict[str, np.ndarray]: Cached data for backward prop.
"""
h_i, w_i, c_i = input.shape
c_o, f, f_2, c_k = weight.shape

assert (f == f_2)
assert (c_i == c_k)
assert (bias.shape[0] == c_o)

input_pad = np.pad(input, [(padding, padding), (padding, padding), (0, 0)])

def cal_new_sidelngth(sl, s, f, p):
return (sl + 2 * p - f) // s + 1

h_o = cal_new_sidelngth(h_i, stride, f, padding)
w_o = cal_new_sidelngth(w_i, stride, f, padding)

output = np.empty((h_o, w_o, c_o), dtype=input.dtype)

for i_h in range(h_o):
for i_w in range(w_o):
for i_c in range(c_o):
h_lower = i_h * stride
h_upper = i_h * stride + f
w_lower = i_w * stride
w_upper = i_w * stride + f
input_slice = input_pad[h_lower:h_upper, w_lower:w_upper, :]
kernel_slice = weight[i_c]
output[i_h, i_w, i_c] = np.sum(input_slice * kernel_slice)
output[i_h, i_w, i_c] += bias[i_c]

cache = dict()
cache['Z'] = output
cache['W'] = weight
cache['b'] = bias
cache['A_prev'] = input
return cache


def conv2d_backward(dZ: np.ndarray, cache: Dict[str, np.ndarray], stride: int,
padding: int) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""2D Convolution Backward Implemented with NumPy

Args:
dZ: (np.ndarray): The derivative of the output of conv.
cache (Dict[str, np.ndarray]): Record output 'Z', weight 'W', bias 'b'
and input 'A_prev' of forward function.
stride (int): Stride for convolution.
padding (int): The count of zeros to pad on both sides.

Outputs:
Tuple[np.ndarray, np.ndarray, np.ndarray]: The derivative of W, b,
A_prev.
"""
W = cache['W']
b = cache['b']
A_prev = cache['A_prev']
dW = np.zeros(W.shape)
db = np.zeros(b.shape)
dA_prev = np.zeros(A_prev.shape)

_, _, c_i = A_prev.shape
c_o, f, f_2, c_k = W.shape
h_o, w_o, c_o_2 = dZ.shape

assert (f == f_2)
assert (c_i == c_k)
assert (c_o == c_o_2)

A_prev_pad = np.pad(A_prev, [(padding, padding), (padding, padding),
(0, 0)])
dA_prev_pad = np.pad(dA_prev, [(padding, padding), (padding, padding),
(0, 0)])

for i_h in range(h_o):
for i_w in range(w_o):
for i_c in range(c_o):
h_lower = i_h * stride
h_upper = i_h * stride + f
w_lower = i_w * stride
w_upper = i_w * stride + f

input_slice = A_prev_pad[h_lower:h_upper, w_lower:w_upper, :]
# forward
# kernel_slice = W[i_c]
# Z[i_h, i_w, i_c] = np.sum(input_slice * kernel_slice)
# Z[i_h, i_w, i_c] += b[i_c]

# backward
dW[i_c] += input_slice * dZ[i_h, i_w, i_c]
dA_prev_pad[h_lower:h_upper,
w_lower:w_upper, :] += W[i_c] * dZ[i_h, i_w, i_c]
db[i_c] += dZ[i_h, i_w, i_c]

if padding > 0:
dA_prev = dA_prev_pad[padding:-padding, padding:-padding, :]
else:
dA_prev = dA_prev_pad
return dW, db, dA_prev


@pytest.mark.parametrize('c_i, c_o', [(3, 6), (2, 2)])
@pytest.mark.parametrize('kernel_size', [3, 5])
@pytest.mark.parametrize('stride', [1, 2])
@pytest.mark.parametrize('padding', [0, 1])
def test_conv(c_i: int, c_o: int, kernel_size: int, stride: int, padding: str):

# Preprocess
input = np.random.randn(20, 20, c_i)
weight = np.random.randn(c_o, kernel_size, kernel_size, c_i)
bias = np.random.randn(c_o)

torch_input = torch.from_numpy(np.transpose(
input, (2, 0, 1))).unsqueeze(0).requires_grad_()
torch_weight = torch.from_numpy(np.transpose(
weight, (0, 3, 1, 2))).requires_grad_()
torch_bias = torch.from_numpy(bias).requires_grad_()

# forward
torch_output_tensor = torch.conv2d(torch_input, torch_weight, torch_bias,
stride, padding)
torch_output = np.transpose(
torch_output_tensor.detach().numpy().squeeze(0), (1, 2, 0))

cache = conv2d_forward(input, weight, bias, stride, padding)
numpy_output = cache['Z']

assert np.allclose(torch_output, numpy_output)

# backward
torch_sum = torch.sum(torch_output_tensor)
torch_sum.backward()
torch_dW = np.transpose(torch_weight.grad.numpy(), (0, 2, 3, 1))
torch_db = torch_bias.grad.numpy()
torch_dA_prev = np.transpose(torch_input.grad.numpy().squeeze(0),
(1, 2, 0))

dZ = np.ones(numpy_output.shape)
dW, db, dA_prev = conv2d_backward(dZ, cache, stride, padding)

assert np.allclose(dW, torch_dW)
assert np.allclose(db, torch_db)
assert np.allclose(dA_prev, torch_dA_prev)