0%

词嵌入简介

基础概念

之前,我们是用one-hot编码来表示单词的:假设一个单词在词汇表里的序号是$t$,词汇表大小是$T$,则这个单词的编码是一个长度为$T$的向量,向量只有第$t$维是1,其他维都是0。我们用$O_t$来表示这个单词的one-hot编码。如下图所示。

这种表示法能区分每个词,但是,它有一个缺陷:one-hot向量两两之间的乘积为0,不能通过向量的相似度推理出单词的相似度。因此,在NLP中,一个重要的任务就是找到一种合理的词表示方法,使得我们能够利用向量的某些性质来表示单词之间的某些性质。

我们来看一种新的词表示方法。假设有Man Woman King Queen Apple Orange这6个单词,我们从性别(Gender),皇家的(Royal),年龄(Age),是否是食物(Food)这几个角度来描述这几个单词,可以填写出这样一份表格:

在这份表格里,每一列的数字可以看成属于某一单词的向量,这个向量就是一种词表示方法。我们用$e_t$表示词汇表里序号$t$的这种有意义的向量。比如$e_{5391}$就是Man的向量。观察每列的向量,我们能发现Woman和Man很相似,King Queen很相似,Apple和Orange很相似。这样,使用这种词表示,单词的相似度由向量的相似度表示了出来,符合我们对词表示的期望。

当然,使用算法生成的词表示中,向量的每一维不可能像这样可解释性这么强。

这种用向量描述单词的方法称为词嵌入(word embedding)。使用高维向量描述单词,就好像是把一个抽象的概念嵌进了一个向量一样。

词嵌入使用示例

看完了词嵌入的基础概念,我们来看看使用词嵌入有什么好处。

还是以命名实体识别任务为例。假设有这么一句话”Sally Johnson is an orange farmer”,我们能够推断出”Sally Johnson”是一个人名,这是因为我们看到了后面的”orange farmer”。橙子农民很可能与人名对应。

假设从刚刚那句话中,模型已经学会了橙子农民与人名之间的关系。现在,又有了一条新的训练样本”Robert Lin is an apple farmer”。使用了词嵌入的话,模型虽然不知道“苹果农民”是什么,但它知道”apple”和”orange”是很相似的东西,能够很快学会这句话的”Robert Lin”也是一个人名。

也就是说,通过使用词嵌入,模型能够利用单词之间的关系更快地完成学习。实际上,不仅是训练,使用了词嵌入后,哪怕出现了训练集中没出现过的单词,模型也能根据单词间的关系做出正确的推理。我们再来看一个例子。

假设又有一条测试样本”Robert Lin is a durian cultivator”。模型可能从来在训练集里没有见过”durian”和”cultivator”这两个单词。但是,通过词嵌入,模型知道”durian(榴莲)”是一种和”apple”相近的东西,”cultivator(培育者)”是一种和”farmer”类似的东西。通过这层关系,模型还是能够推理出”Robert Lin”是一个人名。

这就是词嵌入之所以那么重要的原因。词嵌入本质上是一种迁移学习,它隐含了在其他数据中学习到的单词之间的关系。利用这些知识,另一种任务能够在词嵌入的帮助下更快地完成学习。一般使用词嵌入的步骤如下:

  1. 用大量数据(千亿级)训练出词嵌入。
  2. 把词嵌入迁移到新任务上,使用一个较小的数据集(比如,万级)。
  3. 可选:继续finetune词嵌入(仅当新任务的数据量足够多时)。

除了加速训练外,词嵌入还有一个好处:词嵌入的向量长度往往较少。比如一个长度为10000的字典训练出来的词嵌入可能长度只有300。

结束这节前,顺便提一下词嵌入与上门课讲到的人脸识别中的编码(encoding)之间的关系。不管是嵌入还是编码,其实都是指向量,两种描述不少时候可以通用。但是,人脸识别中的编码主要指对任何一张数据算出一个编码,而嵌入指把一个已知的单词集合的每一个单词嵌进一个向量空间里。二者的主要区别在于输入集合是否固定。

词嵌入的性质

掌握了词嵌入的基本应用,让我们在另一种任务里进一步认识词嵌入的性质。

NLP里有一种任务叫做类比推理。比如,Man和Woman的关系,相当于King和谁的关系?有了词嵌入,这一问题就很好回答了。

还是以刚刚那张人工构造出来的词嵌入表为例。这里我们用$e_{man}$来表示Man的词嵌入,以此类推。

单词间的关系不好描述,而向量间的关系却很容易算。我们可以用向量的差来表示向量直接的关系。计算一下,$e_{man}-e_{woman} \approx [2, 0, 0, 0]^T$, $e_{king}-e_{queen} \approx [2, 0, 0, 0]^T$。通过猜测,我们发现King和Queen的关系与Man和Woman的关系类似。进而可以得出,Man之于Woman,相当于King之于Queen。

准确来说,刚刚这个问题相当于求解一个单词$w$,其嵌入$e_w$满足

,移项,$w$的计算方法就是:

,其中$sim$表示某种相似度,比如cosine相似度。只要我们计算出了右边的$e_{king}-e_{man}+e_{woman}$,再用某种算法就可以求出最合适的$e_w$了。

通过这些观察,我们可以发现,词嵌入蕴含了语义信息。词嵌入间的差别很有可能就是语义上的差别。

词嵌入矩阵

假设词嵌入向量的长度是300,有10000个单词。词嵌入的过程,其实就算把一个长度为10000的向量映射成长度为300的向量的过程。这个过程可以用一个$300 \times 10000$的矩阵$E$表示,$E$就是词嵌入向量的数组。每一个词嵌入列向量$e_t$可以由one-hot编码$o_t$和$E$计算得到:$e_t=Eo_t$。

词嵌入的学习

认识了词嵌入的基本概念后,我们来看看如何用学习算法得到一个词嵌入矩阵。

学习词嵌入和学习神经网络的参数是一样的。只要我们是在根据词嵌入算一个损失函数,就可以使用梯度下降法优化词嵌入。因此,问题的关键在于如何建模一个使用到词嵌入的优化任务。

语言模型

早期的词嵌入是通过语言模型任务学习到的。回想上周课的内容,语言模型就是预测一句话在这种语言中出现的概率。比如 “I want a glass of orange ___ .”,我们会自然地觉得空格里的单词是”juice”,这是因为填juice后整句话的出现概率比填其他单词更高一点。

暂时抛开上周讲的RNN,我们可以用一个使用词嵌入的神经网络来学习语言模型,如下图所示:

这个任务的输入是一句话中连续的6个单词,输出是第7个单词的预测。在用神经网络建模时,要先根据各个单词的one-hot编码$o_t$从词嵌入矩阵$E$中选出其嵌入$e_t$,再把各个单词的嵌入堆叠成一个向量,输入进标准神经网络里,最后用一个softmax预测下一个单词。

在这个模型中,可见的单词数是固定的。假设词嵌入的维度是300,那么根据6个单词进行预测的神经网络的输入向量长度就必须是1800。为了遍历整句话,可以拿一个长度为7(算上预测词)的滑动窗口在整句话上滑一遍。

可见前6个单词,其实算是网络的一个超参数。除了选取前6个单词外,还有其他的选取上下文的方式。常见的上下文选取方式如下:

  • 前4个单词
  • 前4个单词和后4个单词
  • 前1个单词
  • 往前数的第2个单词

经研究,如果只是要学习一个语言模型,使用前4个单词可能更好。而如果要学词嵌入的话,后几个方式也不错。

Word2Vec

Word2Vec是一种比语言模型更高效的词嵌入学习算法。与语言模型任务的思想类似,Word2Vec也要完成一个单词预测任务:给定一个上下文(context)单词,要求模型预测一个目标(target)单词。但是,这个目标单词不只是上下文单词的后一个单词,而是上下文单词前后10个单词中任意一个单词。比如在句子”I want a glass of orange juice to go along with my cereal”中,对于上下文单词glass,目标单词可以是juice, glass, my。

具体来说,每一条训练样本是一个上下文单词和目标单词的词对。比如(orange, juice), (orange, glass)。为了生成这些训练数据,我们要从语料库里每一个句子里采样出训练词对。在采样时,要先对上下文单词采样,再对目标单词采样。

假设有了上下文单词,对目标单词采样很简单,只需要从上下文单词的前后10个单词中均匀采样单词即可。而采样上下文单词就需要一些设计了。在英文中,大部分单词都是a, the, of这些没什么含义的词,如果在句子里均匀采样的话,大部分时候得到的都是这些词。因此,在Word2Vec论文中,有一些对上下文单词采样的设计,各单词的出现概率会更平均一点。

看完了训练数据的采样,再看一看Word2Vec的模型。Word2Vec模型非常简单,它是只有一个softmax层的神经网络,我们可以直接写出这个模型的公式:

$p(t|c)$即目标单词$t$在上下文单词$c$前后的出现概率。$e_c$是$c$的嵌入。$\theta$是softmax层的线性计算的参数,这里我们省略掉了bias。求和里的10000是整个词汇表的大小,也就是softmax输出向量的大小。

和其他多分类任务一样,这个任务的损失函数也是交叉熵函数。

Word2Vec的模型结构十分简单,因此,整个模型的计算量全部落在了softmax的分母求和上。假设词汇表有n个单词,整个模型的时间复杂度就是$O(n)$。在词汇表很大时,求和的开销也是很大的。

为了优化这个求和,Word2Vec使用了H-Softmax(Hierachical Softmax)这种优化方式。一个多分类任务,其实可以拆成多个二分类任务。比如有“猫、狗、树、草”这四种类别,我们可以先做是动物还是植物的二分类,再做一次更具体的二分类,最后把两次次二分类的概率乘起来。H-Softmax就是用这种思想优化了softmax的求和。

使用H-Softmax前,要先对所有单词建立一颗二叉树,比如对A, B, C, D四个单词,可以这样建树:”ABCD-(AB,CD)”, “AB-(A,B)”, “CD-(C,D)”。这样,把一个多分类问题拆成多个二分类问题,就等价于从树的根部开始,经过多个节点,达到单词所在的叶节点。使用H-Softmax时,只要把访问该单词的路径上所有节点的概率乘起来就行了。比如要求单词$x$是$C$的概率,可以先算$x$属于$CD$的概率$P(x \in CD)$,再算已知$x$属于$CD$时$x$是C的概率$P(x \in C | x \in CD)$,二者一乘就是我们要的$P(x \in C)$。

二分类的复杂度是$O(1)$,要做$O(logn)$次二分类。因此,经优化后,H-Softmax的复杂度$O(logn)$。实际上,这个算法还有一些优化空间。词汇表里的词汇是固定的,我们可以巧妙地修改建立二叉树的方法,进一步减少运算量。“给定各元素的访问概率(在这个问题里是单词在语言里的出现概率),对所有元素建立一颗二叉树,以最小化访问叶节点的路径长度的期望”是一个经典的问题,这个问题的解法叫做哈夫曼编码。这是离散数学的知识,和本课的关系就不大了。

H-Softmax的核心思想是把多分类拆成二分类,搞懂这个就行了。至于使用二叉树,怎么建立更好的二叉树,这是一个独立的子问题,理解它和理解H-Softmax无关。在学NLP时,可以先把这个子问题放一放,理解H-Softmax的用意就行。

Word2Vec的目标任务还有其他的形式。除了找上下文单词前后10个单词中的某个目标单词外,还可以用前后的1个单词预测中间的目标单词,这种方法叫做CBow。两种方法各有千秋。Word2Vec的主要思想是那个单层softmax模型,具体的任务倒不是最重要的。

负采样

通过前面几个小节的学习,我们能够总结词嵌入学习的一些经验:词嵌入的根本目的是学习词嵌入矩阵,使用词嵌入的任务倒没有那么重要。因此,我们可以放心大胆地去简化每轮任务的计算量,加快词嵌入的学习效率。

基于这种思想,我们可以进一步去优化Word2Vec里的多分类任务。实际上,一个N分类任务,可以“复杂化”成N个二分类任务——逐个判断输入是否是N个类别中的一种。顺着这个思路,我们不用去求给定上下文单词时目标单词的概率分布,只需要判断给定上下文单词和目标单词,判断二者是否相关即可。

这样,在每一轮任务中,我们不用去计算多分类的softmax,只要计算一个二分类的sigmoid就行了。这样一种算法叫做负采样(Negative Sampling)。

负采样使用的模型和Word2Vec一样简单:输入一个上下文单词的嵌入,经过一个sigmoid层,输出那个上下文单词和某个目标单词是否相关。

负采样算法中真正的难点是训练数据的生成。在看数据生成算法之前,我们先看一下训练样本的格式。负采样的每一条样本是一个三元组(context, word, target),分别表示上下文单词、目标单词、用01表示的两个单词是否相关。比如,我们可能会得到这样的正负样本:

context word target
orange juice 1
orange king 0

接下来,我们来看如何生成这些样本。使用Word2Vec的采样方法,我们会对语料库里的每一句话采样出一些词对。这样,每一个词对能构成一个正样本,它的target值为1。

正样本很好生成,可负样本就不是很好采样了。负采样算法使用了一种巧妙的采样方法(这也是其名称的由来):在生成一个正样本的同时,算法还会对同一个上下文单词context生成$k$个target为0的负样本。这些样本里的目标单词word是随机挑选的。

举个例子,设$k=4$,在”I want a glass of orange juice to go along with my cereal”这句话中,假如我们采样到了(orange, juice)这个词对,我们可能会随机选4个单词,得到下面这些训练样本:

context word target
orange juice 1
orange king 0
orange book 0
orange the 0
orange of 0

注意,每$k$个负样本是针对一条正样本而言的。尽管orange, of都出现在了这句话里,但我们在考虑(orange, juice)这个正样本词对时,会把其他所有词对都当做负样本。

刚刚讲到,负样本里的word是随机挑选的。其实,这种“随机”有一些讲究。如果对所有单词均匀采样,那么不常用的词会被过度学习;如果按照单词的出现频率采样,of, the这些助词又会被过度学习。因此,在采样负样本时,这个负采样算法的论文使用了这样一种折中的方法:

这里$p(w_i)$表示第$i$个单词$w_i$被采样到的概率,$f(w_i)$是单词在这个语言中的出现频率。公式里的$\frac{3}{4}$是根据经验试出来的。

GloVe

GloVe(global vectors for word representation)是一种更加简单的求词嵌入的算法。刚才学习的几种方法都需要进行复杂的采样,而GloVe使用了一种更简洁的学习目标$x_{ij}$,以代替多分类任务或者二分类任务。

$x_{ij}$为给定上下文单词$i$时单词$j$出现的次数。和前面一样,这里的“上下文”可以有多种定义。比如,如果上下文的定义是“前后5个单词”,那么这就是一个对称的上下文定义,$x_{ij}=x_{ji}$;如果上下文的定义是“后1个单词”,则$x_{ij}\neq x_{ji}$。我们可以简单地把$x_{ij}$理解成$j$出现在$i$附近的次数。

比如我们把上下文定义为前后2个单词。在句子”a b c b d e”中,$x_{ca}=1, x_{cb}=2, x_{cd}=1, x_{ce}=0$。

有了$x_{ij}$,我们就能直接知道给定上下文$i$时各个单词$j$的出现频率,而不需要再构建一个分类任务去学习单词$j$出现的条件概率。这样一个新的误差函数是:

视频里的公式把i, j写反了。

和之前几个任务一样,$e_i$是上下文单词的词嵌入,$\theta$是线性计算的参数。$\theta^{T}_je_i$其实再就是拟合某单词$j$和上下文单词$i$的相关程度。而$logx_{ij}$恰好能反映某单词$j$和上下文单词$i$的相关程度。

刚刚那个误差函数有几个需要改进的地方:

  • log 里面可能出现0。对于$x_{ij}=0$的地方,我们要想办法让$logx_{ij}=0$。
  • 不同单词的出现频率不同。对于出现频率较少的单词,我们可以限制它对优化目标的影响。
  • 可以像普通的线性层一样,加入偏差项bias。

因此,最终的损失函数为(假设词汇表大小10000):

其中,$f$是权重项,既用于防止$x_{ij}=0$($f(0)=0$),也用于调节低频率单词的影响。$b_i, b’_j$分别是上下文单词和目标单词的偏差项。

有趣的是,当$x_{ij}$是对称矩阵的时候,$\theta, e$也是对称的,它们在式子里的作用是等价的。我们可以让最终的词嵌入为$\theta, e$的平均值。

在结束词嵌入的学习前,我们还要补充学习一下词嵌入的一些性质。

上图是我们在这节课的开头学习到的“人造词嵌入”。在这个词嵌入中,向量的每一个维度都有一个意义。而在实际情况中,算法学习出来的词嵌入不能保证每个维度都只有一个意义。根据线性代数的知识,要表示同一个空间,有无数组选择坐标轴的方法。很可能0.3x+0.7y这个方向表示一个意思,0.4y+0.6z这个方向又表示一个意思,而不是每个坐标轴的方向恰好表示一个意思。当然,不管怎么选取坐标轴,两个向量的相对关系不会变,对词嵌入做减法以判断两个单词的关系的做法依然适用。

词嵌入的应用

情感分析

词嵌入可以应用于情感分析(Sentiment Classification)任务。在情感分析任务中,算法的输入是一段文字(比如影评),输出是用户表达出来的喜恶程度(比如1-5星)。

有了词嵌入,我们可以轻松地构筑一个简单的模型。

如上图所示,只要简单地对所有输入单词的词嵌入取平均值,放入softmax即可。

这种算法确实能够生效。但是,它只考虑了每个单词的含义,而忽略了整体的意思。如果句子里有”not”这种否定词,这个模型就不太有效了。为此,我们可以构建更精巧的RNN模型。

如第一周所学,RNN是一个“多对一”任务。我们可以让RNN最后一轮输出一个分类结果。只不过,这次输入RNN的不是one-hot向量,而是更有意义的词嵌入。

消除歧视

词嵌入会自动从大量的本文中学习知识。但是,数据中的知识可能本身带有偏见。比如,在自动学到的词嵌入看来,男人之于程序员,就像女人之于家庭主妇;父亲之于医生,就像母亲之于护士。类似的歧视不仅存在于性别这一维度,还存在于种族、年龄、贫富等维度。我们希望消除词嵌入里面的这些歧视。

本节仅对消除歧视的方法做一个简介,很多实现细节都被省略了。详情请见原论文。

词嵌入本身是向量,歧视其实就是某些本应该对称的向量不太对称了。我们的目的就是在带有偏见的维度上令向量对称。

第一步,我们要找到带有偏见的维度。比如,对于性别维度,我们可以算$e_{he}-e_{she}, e_{male}-e_{female}$,对这些表示同一意义的方向取一个平均向量,得到偏见的方向。得到方向后,我们可以用一个平面图来可视化和偏见相关的向量。假设词嵌入的长度是300,那么x轴表示带有偏见的那个维度,y轴表示剩余的299个维度。

所有的单词可以分成两类:和性别相关的明确(definitional)单词和剩余不明确的单词(明确单词需要手动找出来)。第二步,我们要让所有不明确单词都恰好回到y轴上。这样,任何其他单词都不会偏向某一性别了。

最后,有些和性别相关还不够对称。我们要想办法让每对和性别相关的词恰好按y轴对称。

总结

在这堂课中,我们系统地学习了词嵌入这个概念,并大致了解了如何在NLP任务中使用词嵌入。相关的知识有:

  • 词嵌入简介
    • 从one-hot到词嵌入
    • 词嵌入向量的意义
  • 词嵌入学习算法
    • 语音模型
    • Word2Vec
    • 负采样
    • GloVe
  • 如何应用词嵌入

词嵌入是专属于NLP的概念,且是NLP任务的基石。如果要开展NLP相关研究,词嵌入是一个绕不过去的知识;反过来说,如果不搞NLP,只是想广泛地学习深度学习,那么词嵌入本身可能不是那么重要,对词嵌入问题的建模方法会更重要一点。

只从实用的角度来看的话,这堂课介绍的知识并没有那么重要,网上能够轻松找到别人预训练好的词嵌入权重。真正重要的是词嵌入在框架中的使用方法,以及如何在一般任务中使用词嵌入。在这周的代码实战中,我会分享一下如何用预训练的词嵌入完成某些NLP任务。

词嵌入能够用更加有意义的向量表示单词。在NLP任务中使用预训练的词嵌入,往往能极大地加快训练效率。在这篇文章中,我将面向NLP初学者,分享一下如何在PyTorch中使用预训练的GloVe词嵌入,并借助它完成一个简单的NLP任务——情感分析。

相关的背景知识可以参考我之前有关词嵌入的文章:词嵌入 (Word2Vec, GloVe)

项目网址:https://github.com/SingleZombie/DL-Demos/tree/master/dldemos/SentimentAnalysis

GloVe 词嵌入

GloVe是一种学习词嵌入的方法,它希望拟合给定上下文单词$i$时单词$j$出现的次数$x_{ij}$。使用的误差函数为:

其中,$N$是词汇表大小,$\theta, b$是线性层参数,$e_i$是词嵌入。$f(x)$是权重项,用于平衡不同频率的单词对误差的影响,并消除$log0$时式子不成立的情况。

GloVe作者提供了官方的预训练词嵌入(https://nlp.stanford.edu/projects/glove/ )。预训练的GloVe有好几个版本,按数据来源,可以分成:

  • 维基百科+gigaword(6B)
  • 爬虫(42B)
  • 爬虫(840B)
  • 推特(27B)

其中,括号里的数字表示数据集的token数。

按照词嵌入向量的大小分,又可以分成50维、100维、200维等不同维度。

预训练GloVe的文件格式非常简明。一行表示一个单词向量,每行先是一个单词,再是若干个浮点数,表示该单词向量的每一个元素。

当然,在使用PyTorch时,我们不必自己去下载解析GloVe,而是可以直接调用PyTorch的库自动下载解析GloVe。首先,我们要安装PyTorch的NLP库——torchtext。

conda可以用下面的命令安装:

1
conda install -c pytorch torchtext

pip可以直接安装:

1
pip install torchtext

之后,在Python里运行下面的代码,就可以获取GloVe的类了。

1
2
3
4
import torch
from torchtext.vocab import GloVe

glove = GloVe(name='6B', dim=100)

如前文所述,GloVe的版本可以由其数据来源和向量维数确定。在构建GloVe类时,要提供这两个参数。最好是去GloVe的官网查好一个确定的版本,用该版本的参数构建这个GloVe类。我在这个项目中使用的是6B token,维度数100的GloVe。

调用glove.get_vecs_by_tokens,我们能够把token转换成GloVe里的向量。

1
2
3
# Get vectors
tensor = glove.get_vecs_by_tokens(['', '1998', '199999998', ',', 'cat'], True)
print(tensor)

PyTorch提供的这个函数非常方便。如果token不在GloVe里的话,该函数会返回一个全0向量。如果你运行上面的代码,可以观察到一些有趣的事:空字符串和199999998这样的不常见数字不在词汇表里,而1998这种常见的数字以及标点符号都在词汇表里。

GloVe类内部维护了一个矩阵,即每个单词向量的数组。因此,GloVe需要一个映射表来把单词映射成向量数组的下标。glove.itosglove.stoi完成了下标与单词字符串的相互映射。比如用下面的代码,我们可以知道词汇表的大小,并访问词汇表的前几个单词:

1
2
3
myvocab = glove.itos
print(len(myvocab))
print(myvocab[0], myvocab[1], myvocab[2], myvocab[3])

最后,我们来通过一个实际的例子认识一下词嵌入的意义。词嵌入就是向量,向量的关系常常与语义关系对应。利用词嵌入的相对关系,我们能够回答“x1之于y1,相当于x2之于谁?”这种问题。比如,男人之于女人,相当于国王之于王后。设我们要找的向量为y2,我们想让x1-y1=x2-y2,即找出一个和x2-(x1-y1)最相近的向量y2出来。这一过程可以用如下的代码描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_counterpart(x1, y1, x2):
"""Find y2 that makes x1-y1=x2-y2"""
x1_id = glove.stoi[x1]
y1_id = glove.stoi[y1]
x2_id = glove.stoi[x2]
x1, y1, x2 = glove.get_vecs_by_tokens([x1, y1, x2], True)
target = x2 - x1 + y1
max_sim = 0
max_id = -1
for i in range(len(myvocab)):
vector = glove.get_vecs_by_tokens([myvocab[i]], True)[0]
cossim = torch.dot(target, vector)
if cossim > max_sim and i not in {x1_id, y1_id, x2_id}:
max_sim = cossim
max_id = i
return myvocab[max_id]


print(get_counterpart('man', 'woman', 'king'))
print(get_counterpart('more', 'less', 'long'))
print(get_counterpart('apple', 'red', 'banana'))

在函数get_counterpart中,我们遍历所有向量,根据cosine相似度,找一个和x2-x1+y1最相近的向量(除三个输入向量之外)。使用这个函数,我们可以回答以下三组问题:

  • man-woman, king-queen
  • more-less, long-short
  • apple-red, banana-yellow

词嵌入确实非常神奇,连反义词、水果的颜色这种抽象关系都能记录。当然,这里我只挑选了几组成功的例子。这种算法并不能认出单词的比较级(good-better, bad-worse)等更抽象的关系。

通过这一节的实践,我们认识了GloVe的基本用法。接下来,我们来看看怎么用词嵌入完成情感分析任务。

基于GloVe的情感分析

情感分析任务与数据集

和猫狗分类类似,情感分析任务是一种比较简单的二分类NLP任务:给定一段话,输出这段话的情感是积极的还是消极的。

比如下面这段话:

I went and saw this movie last night after being coaxed to by a few friends of mine. I’ll admit that I was reluctant to see it because from what I knew of Ashton Kutcher he was only able to do comedy. I was wrong. Kutcher played the character of Jake Fischer very well, and Kevin Costner played Ben Randall with such professionalism. ……

这是一段影评,大意说,这个观众本来不太想去看电影,因为他认为演员Kutcher只能演好喜剧。但是,看完后,他发现他错了,所有演员都演得非常好。这是一段积极的评论。

再比如这段话:

This is a pale imitation of ‘Officer and a Gentleman.’ There is NO chemistry between Kutcher and the unknown woman who plays his love interest. The dialog is wooden, the situations hackneyed.

这段影评说,这部剧是对《军官与绅士》的一个拙劣的模仿。Kutcher和那个成为他心上人的路人女性之间没有产生任何“化学反应”。对话太死板,场景太陈腐了。这是一段消极的评论。

这些评论都选自斯坦福大学的大型电影数据集。它收录了IMDb上的电影评论,正面评论和负面评论各25000条。这个数据集是情感分析中最为常用的数据集,多数新手在学习NLP时都会用它训练一个情感分析模型。我们这个项目也会使用这个数据集。

这个数据集的文件结构大致如下:

text
1
2
3
4
5
6
7
8
9
10
├─test
│ ├─neg
│ │ ├ 0_2.txt
│ │ ├ 1_3.txt
│ │ └ ...
│ └─pos
├─train
│ ├─neg
│ └─pos
└─imdb.vocab

其中,imdb.vocab记录了数据集中的所有单词,一行一个。testtrain是测试集和训练集,它们的negpos子文件夹分别记录了负面评论和正面评论。每一条评论都是一句话,存在一个txt文件里。

使用下面这个函数,我们就可以读取一个子文件夹里的所有评论:

1
2
3
4
5
6
7
8
9
10
11
import os

def read_imdb(dir='data/aclImdb', split='pos', is_train=True):
subdir = 'train' if is_train else 'test'
dir = os.path.join(dir, subdir, split)
lines = []
for file in os.listdir(dir):
with open(os.path.join(dir, file), 'rb') as f:
line = f.read().decode('utf-8')
lines.append(line)
return lines

这里,顺便介绍一下torchtext提供的分词工具。在NLP中,我们在得到一段文本时,一般需要对文本做一步预处理操作,把一段话变成“单词”的数组。这里的“单词”即可以是英文单词,也可以是数字序列、标点符号。在NLP中,这步预处理操作称为分词,“单词”叫做token(中文直译是“符号,记号”)。

使用torchtext把一段话转换成token数组的方式如下:

1
2
3
4
5
from torchtext.data import get_tokenizer

tokenizer = get_tokenizer('basic_english')
print(tokenizer('a, b'))
# >> ['a', ',', 'b']

有了它,我们可以验证读取IMDb数据集和分词的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
from torchtext.data import get_tokenizer

def main():
lines = read_imdb()
print('Length of the file:', len(lines))
print('lines[0]:', lines[0])
tokenizer = get_tokenizer('basic_english')
tokens = tokenizer(lines[0])
print('lines[0] tokens:', tokens)


if __name__ == '__main__':
main()

输出:

text
1
2
3
Length of the file: 12500
lines[0]: This is a very light headed comedy about a wonderful ...
lines[0] tokens: ['this', 'is', 'a', 'very', 'light', 'headed', 'comedy', 'about', 'a', 'wonderful', ...

获取经GloVe预处理的数据

在这个项目中,我们的模型结构十分简单:输入序列经过词嵌入,送入单层RNN,之后输出结果。整个项目最难的部分是如何把token转换成GloVe词嵌入。在这一节里,我将介绍一种非常简单的实现方法。

torchtext其实还提供了一些更方便的NLP工具类(Field, Vectors等),用于管理词向量。但是,这些工具需要一定的学习成本。由于本文的主旨是介绍深度学习技术而非PyTorch使用技巧,本项目不会用到这些更高级的类。如果你以后要用PyTorch完成NLP任务,建议看完本文后参考相关文章进一步学习torchtext的用法。

PyTorch通常用nn.Embedding来表示词嵌入层。nn.Embedding其实就是一个矩阵,每一行都是一个词嵌入。每一个token都是整型索引,表示该token在词汇表里的序号。有了索引,有了矩阵,就可以得到token的词嵌入了。

但是,有些token在词汇表中并不存在。我们得对输入做处理,把词汇表里没有的token转换成<unk>这个表示未知字符的特殊token。同时,为了对齐序列的长度,我们还得添加<pad>这个特殊字符。而用GloVe直接生成的nn.Embedding里没有<unk><pad>字符。如果使用nn.Embedding的话,我们要编写非常复杂的预处理逻辑。

为此,我们可以用GloVe类的get_vecs_by_tokens直接获取token的词嵌入,以代替nn.Embedding。回忆一下前文提到的get_vecs_by_tokens的使用结果,所有没有出现的token都会被转换成零向量。这样,我们就不必操心数据预处理的事情了。

get_vecs_by_tokens应该发生在数据读取之后,它可以直接被写在Dataset的读取逻辑里。我为此项目编写的Dataset如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from torch.utils.data import DataLoader, Dataset
from torchtext.data import get_tokenizer
from torchtext.vocab import GloVe

from dldemos.SentimentAnalysis.read_imdb import read_imdb

GLOVE_DIM = 100
GLOVE = GloVe(name='6B', dim=GLOVE_DIM)


class IMDBDataset(Dataset):
def __init__(self, is_train=True, dir='data/aclImdb'):
super().__init__()
self.tokenizer = get_tokenizer('basic_english')
pos_lines = read_imdb(dir, 'pos', is_train)
neg_lines = read_imdb(dir, 'neg', is_train)
self.lines = pos_lines + neg_lines
self.pos_length = len(pos_lines)
self.neg_length = len(neg_lines)

def __len__(self):
return self.pos_length + self.neg_length

def __getitem__(self, index):
sentence = self.tokenizer(self.lines[index])
x = GLOVE.get_vecs_by_tokens(sentence)
label = 1 if index < self.pos_length else 0
return x, label

数据预处理的逻辑都在__getitem__里。每一段字符串会先被token化,之后由GLOVE.get_vecs_by_tokens得到词嵌入数组。

对齐输入

使用一个batch的序列数据时常常会碰到序列不等长的问题。在我的上篇RNN代码实战文章中,我曾计算了序列的最大长度,并手动为每个序列都创建了一个最大长度的向量。实际上,利用PyTorch DataLoadercollate_fn机制,还有一些更简洁的实现方法。

在这个项目中,我们可以这样创建DataLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import torch
from torch.nn.utils.rnn import pad_sequence

def get_dataloader(dir='data/aclImdb'):
def collate_fn(batch):
x, y = zip(*batch)
x_pad = pad_sequence(x, batch_first=True)
y = torch.Tensor(y)
return x_pad, y

train_dataloader = DataLoader(IMDBDataset(True, dir),
batch_size=32,
shuffle=True,
collate_fn=collate_fn)
test_dataloader = DataLoader(IMDBDataset(False, dir),
batch_size=32,
shuffle=True,
collate_fn=collate_fn)
return train_dataloader, test_dataloader

PyTorch DataLoader在获取Dataset的一个batch的数据时,实际上会先调用Dataset.__getitem__,获取若干个样本,再把所有样本拼接成一个batch。比如用__getitem__获取4个[3, 10, 10]的图片张量,再拼接成[4, 3, 10, 10]这一个batch。可是,序列数据通常长度不等,__getitem__可能会获得[10, 100], [15, 100]这样不等长的词嵌入数组。

为了解决这个问题,我们要手动编写把所有张量拼成一个batch的函数。这个函数就是DataLoadercollate_fn函数。我们的collate_fn应该这样编写:

1
2
3
4
5
def collate_fn(batch):
x, y = zip(*batch)
x_pad = pad_sequence(x, batch_first=True)
y = torch.Tensor(y)
return x_pad, y

collate_fn的输入batch是每次__getitem__的结果的数组。比如在我们这个项目中,第一次获取了一个长度为10的积极的句子,__getitem__返回(Tensor[10, 100], 1);第二次获取了一个长度为15的消极的句子,__getitem__返回(Tensor[15, 100], 0)。那么,输入batch的内容就是:

1
[(Tensor[10, 100], 1), (Tensor[15, 100], 0)]

我们可以用x, y = zip(*batch)把它巧妙地转换成两个元组:

1
2
x = (Tensor[10, 100], Tensor[15, 100])
y = (1, 0)

之后,PyTorch的pad_sequence可以把不等长序列的数组按最大长度填充成一整个batch张量。也就是说,经过这个函数后,x_pad变成了:

1
x_pad = Tensor[2, 15, 100]

pad_sequencebatch_first决定了batch是否在第一维。如果它为False,则结果张量的形状是[15, 2, 100]

pad_sequence还可以决定填充内容,默认填充0。在我们这个项目中,被填充的序列已经是词嵌入了,直接用全零向量表示<pad>没问题。

有了collate_fn,构建DataLoader就很轻松了:

1
2
3
4
DataLoader(IMDBDataset(True, dir),
batch_size=32,
shuffle=True,
collate_fn=collate_fn)

注意,使用shuffle=True可以令DataLoader随机取数据构成batch。由于我们的Dataset十分工整,前一半的标签是1,后一半是0,必须得用随机的方式去取数据以提高训练效率。

模型

模型非常简单,就是单层RNN:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class RNN(torch.nn.Module):
def __init__(self, hidden_units=64, dropout_rate=0.5):
super().__init__()
self.drop = nn.Dropout(dropout_rate)
self.rnn = nn.GRU(GLOVE_DIM, hidden_units, 1, batch_first=True)
self.linear = nn.Linear(hidden_units, 1)
self.sigmoid = nn.Sigmoid()

def forward(self, x: torch.Tensor):
# x shape: [batch, max_word_length, embedding_length]
emb = self.drop(x)
output, _ = self.rnn(emb)
output = output[:, -1]
output = self.linear(output)
output = self.sigmoid(output)

return output

这里要注意一下,PyTorch的RNN会返回整个序列的输出。而在预测分类概率时,我们只需要用到最后一轮RNN计算的输出。因此,要用output[:, -1]取最后一次的输出。

训练、测试、推理

项目的其他地方都比较简单,我把剩下的所有逻辑都写到main函数里了。

先准备好模型。

1
2
3
4
def main():
device = 'cuda:0'
train_dataloader, test_dataloader = get_dataloader()
model = RNN().to(device)

第一步是训练。训练照着普通RNN的训练模板写就行,没什么特别的。注意,在PyTorch中,使用二分类误差时,要在模型里用nn.Sigmoid,并使用nn.BCELoss作为误差函数。算误差前,得把序列长度那一维去掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# train

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
citerion = torch.nn.BCELoss()
for epoch in range(100):

loss_sum = 0
dataset_len = len(train_dataloader.dataset)

for x, y in train_dataloader:
batchsize = y.shape[0]
x = x.to(device)
y = y.to(device)
hat_y = model(x)
hat_y = hat_y.squeeze(-1)
loss = citerion(hat_y, y)

optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()

loss_sum += loss * batchsize

print(f'Epoch {epoch}. loss: {loss_sum / dataset_len}')

torch.save(model.state_dict(), 'dldemos/SentimentAnalysis/rnn.pth')

训练几十个epoch,模型就差不多收敛了。词嵌入对于训练还是有很大帮助的。

训练完了,接下来要测试精度。这些代码也很简单,跑完了模型和0.5比较得到预测结果,再和正确标签比较算一个准确度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# test

# model.load_state_dict(
# torch.load('dldemos/SentimentAnalysis/rnn.pth', 'cuda:0'))

accuracy = 0
dataset_len = len(test_dataloader.dataset)
model.eval()
for x, y in test_dataloader:
x = x.to(device)
y = y.to(device)
with torch.no_grad():
hat_y = model(x)
hat_y.squeeze_(1)
predictions = torch.where(hat_y > 0.5, 1, 0)
score = torch.sum(torch.where(predictions == y, 1, 0))
accuracy += score.item()
accuracy /= dataset_len

print(f'Accuracy: {accuracy}')

我的精度达到了90%多。考虑到模型并不复杂,且并没有用验证集进行调参,这个精度已经非常棒了。

训练完了模型,我们来看看模型能不能在实际应用中排上用场。我去最近的财经新闻里摘抄了几句对美股的评论:

U.S. stock indexes fell Tuesday, driven by expectations for tighter Federal Reserve policy and an energy crisis in Europe. Stocks around the globe have come under pressure in recent weeks as worries about tighter monetary policy in the U.S. and a darkening economic outlook in Europe have led investors to sell riskier assets.

1
2
3
4
5
6
7
8
9
10
# Inference
tokenizer = get_tokenizer('basic_english')
article = ...

x = GLOVE.get_vecs_by_tokens(tokenizer(article)).unsqueeze(0).to(device)
with torch.no_grad():
hat_y = model(x)
hat_y = hat_y.squeeze_().item()
result = 'positive' if hat_y > 0.5 else 'negative'
print(result)

评论说,受到联邦政府更紧缩的保守经济政策和欧洲能源危机的影响,美国股市指数在周二下跌。近几周,全球股市都笼罩在对美国更紧缩的经济政策的担忧压力之下,欧洲灰暗的经济前景令投资者选择抛售高风险的资产。这显然是一段消极的评论。我的模型也很明智地输出了”negative”。看来,情感分析模型还是能在实际应用中发挥用场的。

总结

在这篇文章中,我介绍了GloVe词嵌入在PyTorch的一个应用。如果你只是想学习深度学习,建议多关注一下词嵌入的意义,不需要学习过多的API。如果你正在入门NLP,建议从这个简单的项目入手,体会一下词嵌入的作用。

在上一门课中,我们学习了如何用CNN处理网格状的数据。由于最常见的网格状数据是图像,我们主要学习了如何用CNN完成和图像相关的任务。而在这一门课中,我们要学习如何用循环神经网络(RNN)等序列模型处理序列数据。序列数据的种类就比较丰富多彩了:

如上图所示,对于语音识别、音乐生成、情绪分类、DNA序列分析、机器翻译、视频动作识别、命名实体识别这些任务,它们的输入和输出至少有一个是某类序列数据,它们都可以用序列模型来建模。

计算机科学中的自然语言处理(NLP)任务常常需要使用序列模型。我们在学这门课时,主要会围绕NLP问题进行讨论。

符号标记

序列数据需要用到一些新的符号标记。在开始正式学习之前,我们先以命名实体识别任务为例,认识一下这些新的符号标记。

命名实体识别任务要求找出句子中的有意义的人名、地名等特殊名词。以这个任务的一个训练样本为例,我们来看一看序列数据的符号标记。

设输入是 Harry Potter and Hermione Granger invented a new spell。

命名实体识别任务的输出$y$也是一个序列。序列的每一个元素是1或0,表示输入中对应的单词是否为命名实体。刚刚那个输入对应的输出应该是1 1 0 1 1 0 0 0 0。

输入输出都由单词或数字组成的序列。设输入为$x$,序列中的第$i$个单词记作$x^{< i >}$。比如$x^{< 2 >}$ = Potter。整个序列的长度$T_x=9$。同理,输出序列中的第$i$个元素记作$y^{< i >}$,整个序列的长度$T_y=9$。

对于第$i$个样本而言,其输入的长度为$T_x^{(i)}$,第$t$个单词为$x^{(i)< t >}$。值得注意的是,每个样本的长度可能不一致。

在表示这些数据时,还有一个问题:怎么表示一个单词?计算机可不认识一大堆字母。为了让只懂数字的计算机能够分清不同的单词,我们要先准备一个词汇表,并用单词在词汇表里的one-hot编码作为单词的表示。比如Harry是长度为10000的词汇表里第4075个单词,则Harry的表示是$[0, 0, 0…,0, 1, 0, …, 0]$,这个向量的长度是10000,只有第4075个元素是1,其他地方都是0。

在大型的模型中,词汇表的大小会是30000至50000,甚至有100000的。

有了这些符号标记,和序列数据有关的任务就可以完全用数学语言表示了。比如对于命名实体识别任务,每一条样本输入是一个向量序列,输出是一个01的数字序列。

循环神经网络模型

在序列问题上使用标准神经网络(全连接网络)会有几个问题:

  • 每个样本的长度可能不一致。尽管我们可以找到一个最大的长度,并对长度不足的样本进行零填充,但这种做法看上去就不太好。
  • 同一个单词在不同的位置时应该算出类似的特征。而标准神经网络会把每个位置的输入区别对待,无法共享各个位置的知识。
  • 和图像数据类似,序列数据的输入长度也很大。假如一个句子有10个单词,词汇表的大小是10000,则输入向量的大小就是100000。这样,神经网络第一层的参数量会很大,网络会过拟合。

因此,我们要用一些其他的架构来处理序列数据以规避这些问题。其中一种可行的架构就是循环神经网络(Recurrent Neural Network, RNN)。

RNN运算过程如下图所示。在RNN中,对于一个样本,我们每次只输入一个单词$x^{< t >}$,得到一个输出$y^{< t >}$。除了输出$y^{< t >}$外,神经网络还会把中间激活输出$a^{< t >}$传递给下一轮计算,这个$a^{< t >}$记录了之前单词的某些信息。所有的输出按照这种方法依次计算。当然,第一轮计算时也会用到激活输出$a^{< 0 >}$,简单地令$a^{< 0 >}$为零张量即可。注意,所有的计算都是用同一个权重一样的神经网络。

有些文章会把一行RNN计算折叠成一个带循环箭头的神经网络,这只是另一种画图的方法:

看完了示意图,让我们看看怎样用数学语言表示RNN。在第$t$轮计算中:

其中,$W_{ij}$表示用于计算$i$的,乘在$j$上的矩阵。$g$是激活函数。中间层激活函数多数情况下用tanh,也可以用ReLU。输出的激活函数视任务而定,在二分类的命名实体识别中,输出激活函数是sigmoid。

为了简化表示,可以把$W_{ax}, W_{aa}$拼一下,$x^{< t >}, a^{< t - 1 >}$拼一下:

这样,原来的式子就可以化简了:

RNN也有一些问题。首先,显而易见,每个RNN的输出只能看到它之前的单词。这样是不太好的。比如一句话第一个单词是Teddy,你不知道这是一个人名,还是Teddy bear这样一个普通的物体。稍后我们会学习双向的RNN以解决此问题。

另外,RNN也会面临标准神经网络的梯度爆炸问题。后几节会介绍一些更高级的RNN架构。

「穿越时空之反向传播」

看完了正向传播,我们稍微看一下RNN反向传播的过程。

反向传播前,先要定义一个误差函数。对于命名实体识别这种结果是01的问题,可以继续采用交叉熵误差,即对于序列中每一个元素:

对于一个样本:

接下来是反向传播的过程。RNN的计算虽然复杂了一些,但它本质上还是一个计算图。如下图所示,按照红色箭头的方向对变量反向求导即可:

使用了编程框架后,反向传播可以自动由框架完成,可以不需要关心里面的细节了。

由于序列数据有先后的概念,而RNN的反向传播又是从后面的数据向前面的数据进行,因此这样的反向传播有着「穿越时空之反向传播」的称呼。

不同输入输出格式的RNN

刚刚学习的那种RNN只能描述输入输出长度一致的任务。在那种架构的基础上稍作修改,我们就能得到描述各种输入输出格式的任务。

  • 一对一:其实一对一问题就是标准神经网络,可以不要那个激活输入$a$。
  • 一对多:只把输入放入第一轮计算中,后续计算的输入是上一轮的输出。
  • 多对一:只输出最后一轮计算的结果。
  • 等长多对多:输入一个元素就输出一个元素。
  • 不等长多对多:先做几轮不产生输出的计算(编码器),再做几轮只产生输出的计算(解码器)。解码器的输入也可以和一对多一样,来自于上一轮的输出(图中没有画出)。

RNN应用:语言模型

为了加深对RNN的理解,我们来看一个基于RNN的应用——语言模型。

语言模型是NLP中的一个基础任务。一个语言模型能够输出某种语言某句话的出现概率。通过比较不同句子的出现概率,我们能够开发出很多应用。比如在英语里,同音的”apple and pear”比”apple and pair”的出现概率高(更可能是一个合理的句子)。当一个语音识别软件听到这句话时,可以分别写下这两句发音相近的句子,再根据语言模型断定这句话应该写成前者。

也就是说,对于一句话$x^{< 1 >}…x^{< T_x >}$,语言模型的输出是$P(x^{< 1 >},…, x^{< T_x >})$。这个式子也可以写成$P(x^{< 1 >}) \times P(x^{< 2 >} |x^{< 1 >}) \times P(x^{< 3 >} |x^{< 1 >}, x^{< 2 >}) … \times P(x^{< T_x >} |x^{< 1 >}, x^{< 2 >}, …, x^{< T_x-1 >})$,即一句话的出现概率,等于第一个单词出现在句首的概率,乘上第二个单词在第一个单词之后的概率,乘上第三个单词再第一、二个单词之后的概率,这样一直乘下去。

在训练语言模型时,我们一般要用到语料库(corpus)。语料库包含了某种语言大量的通顺的句子。我们希望一个模型能够学习到这些句子中的规律,知道一句话在这种语言中的出现概率是多少。

语言模型可以用RNN巧妙地实现。整个实现分两步:数据预处理和模型训练。

由于语料库中包含的是自然语言,而RNN的输入是one-hot编码,所以这中间要经过一个预处理的步骤。在NLP中,这一步骤叫做符号化(tokenize)。如我们在「符号标记」一节所学的,我们可以找来一个大小为10000的词汇表,根据每个单词在词汇表中的位置,生成一个one-hot编码。除了普通的词汇外,NLP中还有一些特殊的符号,比如表示句尾的<EOS> (End Of Sentence),表示词汇表里没有的词的<UNK> (Unknown)。

经过预处理后,语料库里的每一句自然语言就变成了训练样本$x^{< 1 >} … x^{< T_x >}$。我们可以把每一句话输入RNN,巧妙地训练一个语言模型:

这个计算过程初次接触时有些令人费解,我们慢慢来看懂它。先竖着看一轮计算是怎么完成的。对于每一轮计算,都会给定一个单词编码$x^{< i >}$,输出一个softmax后的概率分布$\hat{y}^{< i >}$,它要对齐的训练标签是训练集某一句话的某个单词$y^{< i >}$。$\hat{y}$表示接收之前所有的输入单词后,此时刻应该输出某单词的概率分布,这个输出的含义和多分类中的类似。

算出这样的$\hat{y}$有什么用呢?别急,再横着看一遍。回忆一下,语言模型要求的概率可以写成$P(y^{< 1 >}) \times P(y^{< 2 >} |y^{< 1 >}) \times P(y^{< 3 >} |y^{< 1 >}, y^{< 2 >}) …$。RNN每一轮的输出,其实就是要拟合$P(y^{< 1 >})$, $P(y^{< 2 >} |y^{< 1 >})$, $P(y^{< 3 >} |y^{< 1 >}, y^{< 2 >})$, …。每一个条件概率的条件,就是每一轮RNN的输入;每一个条件概率的待求事件,就是每一轮RNN的训练标签。比如$P(y^{< 3 >} |y^{< 1 >}, y^{< 2 >})$这个条件概率,它的条件是$y^{< 1 >}, y^{< 2 >}$,待求事件是$y^{< 3 >}$,所以第三轮RNN的标签是$y^{< 3 >}$,输入是$y^{< 1 >}, y^{< 2 >}$(别忘了,在RNN中,前几轮的输入其实也影响了后续的计算)。当然,第一个概率$P(y^{< 1 >})$没有条件,所以第一轮的输入$x^{< 1 >}=0$。对softmax的结果依然使用的是交叉熵误差,一个序列的误差等于所有元素的误差之和。

刚刚介绍的是训练过程。在用这个模型计算某句子的概率时,只要把一个句子输入进这个RNN,再去softmax的概率分布里取出需要的概率,一乘,就能算出语言模型要求的整句话的概率了。比如Cats average 15 hours of sleep a day. < EOS >这个句子,我们要令$x^{< 1 >}=0$, $x^{< 2 >}=one_ hot(cats)$, $x^{< 3 >}=one_ hot(average)$,……。然后,从第一个输出概率分布$\hat{y}^{< 1 >}$里找出cats对应的概率,去$\hat{y}^{< 2 >}$里找到average对应的概率,去$\hat{y}^{< 3 >}$里找到15对应的概率,以此类推。最后把所有的概率乘起来。

通过这一节,我们学到了RNN的一种应用。是否真正理解语言模型这一任务,并不重要。重要的是,我们学到了RNN是怎么巧妙去完成一项任务的。在完成和序列数据有关的任务时,我们要精心定义RNN的输入序列和输出序列。一旦这两个序列定义好了,训练模型并解决任务就是很轻松的事情。

用语言模型采样出全新的序列

给定一个别人训练好的RNN语言模型,我们可以弄出一个很好玩的应用:生成一个训练集里没有的句子。

我们刚刚学过,在计算一句话的概率时,RNN会把句子里的每一个单词输入,输出单词出现在前几个单词之后的概率分布$\hat{y}$。反过来想,我们可以根据RNN输出的概率分布,随机采样出某一个单词的下一个单词出来。具体来说,我们先随机生成句子里的第一个单词,把它输入RNN。再用RNN生成概率分布,对概率分布采样出下一个单词,采样出一个单词就输入一个单词,直到采样出< EOS >。这个过程就好像是在让AI生成句子一样。

对概率分布采样,其实就是以某种概率随机挑选。比如我有两个骰子,我要计算两个骰子点数之和。这个点数之和就是一个概率分布,掷一轮骰子就是去分布里采样。我们可以快速地算出,点数之和为2的概率是$\frac{1}{36}$, 点数之和为3的概率是$\frac{2}{36}$。也就是说,我们在采样时,有$\frac{1}{36}$的概率取到2,$\frac{2}{36}$的概率取到3。

如果把语料库的最小单元从单词换成字母,句子生成就变成了单词生成,我们可以让AI生成出从没出现过却看上去很合理的单词。

RNN 的梯度问题

在前几门课中,我们曾学过,过深的神经网络会有梯度过大/过小的问题。这些问题在RNN中也存在,毕竟RNN一般都是用来处理很长的序列数据的。

梯度过大的问题倒是有办法解决:设置一个梯度最大值,让所有梯度都不能超过这个值。

梯度过小的问题比较麻烦。想象一个很长的句子:The cat, which ate apples, pears, …., was full.这个cat和was存在着依赖关系。一旦梯度过小,一个句子前后的依赖关系就不是那么好传递了。

下面几节我们会学一些解决梯度问题的架构。

GRU (Gated Recurrent Unit)

我们可以用GRU (Gated Recurrent Unit)来替代标准RNN中的计算单元,以解决梯度问题。

为了明确标准RNN的哪些模块被替换了,我们先回顾一下RNN原来的计算单元。

在标准的RNN单元中,$x^{< t >}$和$a^{< t-1 >}$一起决定了$a^{< t >}$,$a^{< t >}$又决定了$\hat{y}^{< t >}$。

先在脑中对这张图有个印象,稍后我们会看到GRU是怎样改进这个计算单元的。

上一节里,我们分析过,梯度消失会导致一句话后面的单词忘掉了前面的单词。那么,可不可以让网络的“记性”更好一点呢?我们可以参考一下人类的记忆行为。比如,当我们在接到了验证码短信后,要把验证码在脑子里记一段时间:读完了短信,要记住验证码;关闭短信应用,要记住验证码;打开需要验证码的应用,要记住验证码;输入验证吗时,要记住验证码;输完了验证码,总算可以忘记验证码了。这个过程中,我们一直在维护“验证码”这个信息,决定是记住它还是忘记它。我们可以让神经网络模型也按照这种思路记忆之前的信息。

在每轮计算更新中间变量$a$时,GRU还要使用到一个新的变量,表示中间变量$a$该不该忘记。这个变量越靠近0,就说明越应该保持之前的中间变量;越靠近1,就越靠近新的$a$。GRU的这个变量起到了电路中逻辑门的效果(GRU的第一个单词是gated)。

具体来说,一个简化版GRU的计算过程用数学符号表示如下:

我们把中间变量$a$临时更名为$c$,表示记忆单元memory cell。和之前的$a$一样,我们每轮要算一个新的$\tilde{c}$:

而同时,我们还要算一个决定是不是要用$\tilde{c}$去更新过去的$c$的“逻辑门” $\Gamma_u$:

注意,$\Gamma_u$和逻辑门不同,不是真的只能取0或1,而是取0~1中一个中间的值,表示更新的程度。

之后,每一轮的$c^{< t >}$是这样更新的:

它的图示如下,其中紫色部分是更新操作。

在完整的GRU中,计算公式稍微复杂一点:

唯一的区别是$c^{< t - 1 >}$多过了一道$\Gamma_r$。这种设计的好处很难从理论上解释。当时的研究者试了很多类似的GRU架构,最后发现这样的GRU是效果最好的。

LSTM (long short term memory) 单元

LSTM单元是另一种改进版的RNN单元。LSTM的核心思想和GRU一模一样,也是使用门来控制记忆变量的更新幅度,只是公式更复杂了一点。在LSTM中,要传递的中间变量有两个:$c$和$a$,使用的输出门也从2个增加到了3个。

我觉得看图不如看公式看得清楚。

我们不需要刻意去记LSTM的结构,也不要纠结为什么要在哪个地方用哪个门,只需要知道LSTM和GRU的区别,会用它们就行了。

虽然LSTM比GRU更复杂,但实际上LSTM很早(1997年)就有了,GRU是近几年才有的。二者的效果并没有显著的差别,一般认为LSTM功能更强大,GRU计算速度更快。碰到新任务无脑用LSTM即可,而如果要构建较大的网络则可以考虑使用性能更好的GRU。

双向RNN

之前提过,标准的RNN有一个问题:先出现的单词无法获取后续单词的信息。比如句首单词是Teddy,你不知道这是“泰迪熊”还是“泰迪”这个人名。为了解决这个问题,我们可以升级一下RNN的基础架构,使用双向RNN(BRNN)。在这个新架构下,GRU和LSTM的单元可以照用,不受影响。

BRNN的示意图如下:

假设一句话有4个单词。在BRNN中,除了会先从1-4正着输入一遍序列外,还会从4-1倒着输入一遍序列。正着传的中间变量叫$\overrightarrow{a}$,倒着传的中间变量叫做$\overleftarrow{a}$。每一轮输出满足$\hat{y}=g(W_y[ \overrightarrow{a}, \overleftarrow{a} ] + b_y)$。

BRNN+LSTM通常是一个新序列任务的标配。当然,BRNN也有一个缺点:必须等一个序列输入完了才能返回结果,而不能实时返回结果。在语音识别等实时性较强的任务里,可能普通RNN更合适一点。

深层RNN

到目前为止,我们学的RNN都是由几个简单的矩阵运算构成的,似乎和这套课的标题“深度学习”不沾边。实际上,也可以给基础的RNN多加一些参数,变成一个深层的RNN。

正如堆叠标准神经网络的隐藏层一样,我们可以堆叠RNN的基础模块,并传递多个中间变量。由于时序计算的计算成本很高,堆3层的计算量就已经很大了。

如果想进一步提升网络的拟合能力,可以修改计算输出$y$的结构,堆叠一些非时序的神经网络隐藏层。

时序模块之所以计算缓慢,一大原因是无法并行。靠后的变量必须等之前的变量算好了才能计算。而在输出$y$的路径中添加一些隐藏层的运算代价没有那么大,因为这些运算是可以并行的。

同样,使用深层RNN时,双向RNN,还有LSTM, GRU都是可以用的。

总结

在这一课里,我们初次认识了序列数据,并学习了处理序列数据的RNN。利用RNN,我们可以开发出许多和序列数据相关的应用。基础的RNN存在不少问题,所以RNN存在着许多改进方法。让我们看一看这一课的具体知识点:

  • 序列数据及相关任务的示例

    • 语音识别:输入语音,输出文字
    • 音乐生成:无输入,输出语音
    • 机器翻译:输入某种文字,输出另一种文字
    • 情绪分类:输入文字,输出1-5的分数
    • 命名实体识别:输入文字,输出每个单词是否是命名实体
  • 单词的表示

    • 先准备好一个词汇表。比如大小为10000的词汇表(要包括<UNK>, <EOS>)。
    • 每一个单词是一个长度10000的one-hot向量。单词在词汇表中的序号,就是one-hot向量中值为1的下标。
  • 循环神经网络(RNN)

    • 基本思想:用$a$表示上文信息
    • 计算流程:循环输入序列元素,维护$a$
    • 计算公式
    • 反向传播的大概流程
  • 防止RNN梯度消失:GRU, LSTM

    • 基本思想:选择性更新$a$
    • 大概了解GRU, LSTM的公式
    • GRU, LSTM的使用场景
  • 获取下文信息:BRNN

    • 基本思想与结构
    • 使用场景
  • 增强表达能力:深层RNN

    • 怎么添加更多层RNN
    • 怎么更好地拟合输出
  • RNN应用:语言模型

    • 语言模型的定义
    • 语言模型的训练与推理
    • 对语言模型采样

想要入门一项新技术,最快的方法就是写一个”Hello World”程序。入门CNN,大家一般会写一个简单的图片分类项目。可是,RNN的入门项目就比较少见了。自然语言处理任务要求的数据量都比较大,不是那么好设计一个入门项目。

在这篇文章中,我将展示一个入门级的RNN项目——字母级语言模型。这个项目的逻辑比较简单,要求的数据量不大,几分钟就可以训练完,非常适合新手入门。

这个项目使用的框架是PyTorch。首先,我会抛弃PyTorch的高级组件,仅使用线性层、自动求导机制来从头实现一个简单的RNN。之后,我还会用PyTorch的高级组件搭一个更通用的RNN。相信通过阅读这篇教程,大家不仅能够理解RNN的底层原理,还能够学到PyTorch中RNN组件的用法,能够自己搭建出各种各样的NLP任务模型。

知识背景

详细的知识介绍可以参考我的上篇文章:循环神经网络基础

RNN

RNN 适用于处理序列数据。令$x^{< i >}$是序列的第$i$个元素,那么$x^{< 1 >} x^{< 2 >}…x^{< T_x >}$就是一个长度为$T_x$的序列。NLP中最常见的元素是单词,对应的序列是句子。

RNN使用同一个神经网络处理序列中的每一个元素。同时,为了表示序列的先后关系,RNN还有表示记忆的隐变量$a$,它记录了前几个元素的信息。对第$t$个元素的运算如下:

其中,$W, b$都是线性运算的参数,$g$是激活函数。隐藏层的激活函数一般用tanh,输出层的激活函数根据实际情况选用。另外,$a$得有一个初始值$a^{< 1 >}$,一般令$a^{< 1 >}=\vec0$。

语言模型

语言模型是NLP中的一个基础任务。假设我们以单词为基本元素,句子为序列,那么一个语言模型能够输出某句话的出现概率。通过比较不同句子的出现概率,我们能够开发出很多应用。比如在英语里,同音的”apple and pear”比”apple and pair”的出现概率高(更可能是一个合理的句子)。当一个语音识别软件听到这句话时,可以分别写下这两句发音相近的句子,再根据语言模型断定这句话应该写成前者。

规范地说,对于序列$x^{< 1 >}…x^{< T_x >}$,语言模型的输出是$P(x^{< 1 >},…, x^{< T_x >})$。这个式子也可以写成$P(x^{< 1 >}) \times P(x^{< 2 >} |x^{< 1 >}) \times P(x^{< 3 >} |x^{< 1 >}, x^{< 2 >}) … \times P(x^{< T_x >} |x^{< 1 >}, x^{< 2 >}, …, x^{< T_x-1 >})$,即一句话的出现概率,等于第一个单词出现在句首的概率,乘上第二个单词在第一个单词之后的概率,乘上第三个单词再第一、二个单词之后的概率,这样一直乘下去。

单词级的语言模型需要的数据量比较大,在这个项目中,我们将搭建一个字母级语言模型。即我们以字母为基本元素,单词为序列。语言模型会输出每个单词的概率。比如我们输入”apple”和”appll”,语言模型会告诉我们单词”apple”的概率更高,这个单词更可能是一个正确的英文单词。

RNN 语言模型

为了计算语言模型的概率,我们可以用RNN分别输出$P(x^{< 1 >})$, $P(x^{< 2 >} |x^{< 1 >})$, …,最后把这些概率乘起来。

$P(x^{< t >} |x^{< 1 >}, x^{< 2 >}, …, x^{< t-1 >})$这个式子,说白了就是给定前$t-1$个字母,猜一猜第$t$个字母最可能是哪个。比如给定了前四个字母”appl”,第五个单词构成”apply”, “apple”的概率比较大,构成”appll”, “appla”的概率较小。

为了让神经网络学会这个概率,我们可以令RNN的输入为<sos> x_1, x_2, ..., x_T,RNN的标签为x_1, x_2, ..., x_T, <eos><sos><eos>是句子开始和结束的特殊字符,实际实现中可以都用空格' '表示。<sos>也可以粗暴地用全零向量表示),即输入和标签都是同一个单词,只是它们的位置差了一格。模型每次要输出一个softmax的多分类概率,预测给定前几个字母时下一个字母的概率。这样,这个模型就能学习到前面那个条件概率了。

代码讲解

项目地址:https://github.com/SingleZombie/DL-Demos/tree/master/dldemos/BasicRNN。

数据集获取

为了搭建字母级语言模型,我们只需要随便找一个有很多单词的数据集。这里我选择了斯坦福大学的大型电影数据集,它收录了IMDb上的电影评论,正面评论和负面评论各25000条。这个数据集本来是用于情感分类这一比较简单的NLP任务,拿来搭字母级语言模型肯定是没问题的。

这个数据集的文件结构大致如下:

text
1
2
3
4
5
6
7
8
9
10
├─test
│ ├─neg
│ │ ├ 0_2.txt
│ │ ├ 1_3.txt
│ │ └ ...
│ └─pos
├─train
│ ├─neg
│ └─pos
└─imdb.vocab

其中,imdb.vocab记录了数据集中的所有单词,一行一个。testtrain是测试集和训练集,它们的negpos子文件夹分别记录了负面评论和正面评论。每一条评论都是一句话,存在一个txt文件里。

训练字母级语言模型时,直接拿词汇表来训练也行,从评论中截取一个个单词也行。我已经写好了这些读取数据集的代码,在dldemos/BasicRNN/read_imdb.py文件中。

在读取单词时,我们只需要26个字母和空格这一共27个字符。其他的字符全可以过滤掉。为了方便,我使用了正则表达式过滤出这27个字符:

1
words = re.sub(u'([^\u0020\u0061-\u007a])', '', words)

这样,一个读取词汇表文件的函数就长这样:

1
2
3
4
5
6
7
8
9
def read_imdb_vocab(dir='data/aclImdb'):
fn = os.path.join(dir, 'imdb.vocab')
with open(fn, 'rb') as f:
word = f.read().decode('utf-8').replace('\n', ' ')
words = re.sub(u'([^\u0020\u0061-\u007a])', '',
word.lower()).split(' ')
filtered_words = [w for w in words if len(w) > 0]

return filtered_words

我写好了读取词汇表的函数read_imdb_vocabread_imdb_words,它们都会返回一个单词的列表。我还写了一个读数据集整个句子的函数read_imdb。它们的用法和输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
def main():
vocab = read_imdb_vocab()
print(vocab[0])
print(vocab[1])

lines = read_imdb()
print('Length of the file:', len(lines))
print('lines[0]:', lines[0])
words = read_imdb_words(n_files=100)
print('Length of the words:', len(words))
for i in range(5):
print(words[i])
text
1
2
3
4
5
6
7
8
9
10
the
and
Length of the file: 12500
lines[0]: Bromwell High is a cartoon ...
Length of the words: 23425
bromwell
high
is
a
cartoon

数据集读取

RNN的输入不是字母,而是表示字母的向量。最简单的字母表示方式是one-hot编码,每一个字母用一个某一维度为1,其他维度为0的向量表示。比如我有a, b, c三个字母,它们的one-hot编码分别为:

1
2
3
a: [1, 0, 0]
b: [0, 1, 0]
c: [0, 0, 1]

现在,我们只有单词数组。我们要把每个单词转换成这种one-hot编码的形式。

在转换之前,我准备了一些常量(dldemos/BasicRNN/constant.py):

1
2
3
4
5
6
7
EMBEDDING_LENGTH = 27
LETTER_MAP = {' ': 0}
ENCODING_MAP = [' ']
for i in range(26):
LETTER_MAP[chr(ord('a') + i)] = i + 1
ENCODING_MAP.append(chr(ord('a') + i))
LETTER_LIST = list(LETTER_MAP.keys())

我们一共有27个字符,0号字符是空格,剩余字母按照字母表顺序排列。LETTER_MAPENCODING_MAP分别完成了字母到数字的正向和反向映射。LETTER_LIST是所有字母的列表。

PyTorch提供了用于管理数据集读取的Dataset类。Dataset一般只会存储获取数据的信息,而非原始数据,比如存储图片路径。而每次读取时,Dataset才会去实际读取数据。在这个项目里,我们用Dataset存储原始的单词数组,实际读取时,每次返回一个one-hot编码的向量。

使用Dataset时,要继承这个类,实现__len____getitem__方法。前者表示获取数据集的长度,后者表示获取某项数据。我们的单词数据集WordDataset应该这样写(dldemos/BasicRNN/main.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import torch
from torch.utils.data import DataLoader, Dataset

from dldemos.BasicRNN.constant import EMBEDDING_LENGTH, LETTER_MAP
from dldemos.BasicRNN.read_imdb import read_imdb_vocab, read_imdb_words

class WordDataset(Dataset):
def __init__(self, words, max_length, is_onehot=True)
super().__init__()
n_words = len(words)
self.words = words
self.n_words = n_words
self.max_length = max_length
self.is_onehot = is_onehot

def __len__(self):
return self.n_words

def __getitem__(self, index):
"""return the (one-hot) encoding vector of a word"""
word = self.words[index] + ' '
word_length = len(word)
if self.is_onehot:
tensor = torch.zeros(self.max_length, EMBEDDING_LENGTH)
for i in range(self.max_length):
if i < word_length:
tensor[i][LETTER_MAP[word[i]]] = 1
else:
tensor[i][0] = 1
else:
tensor = torch.zeros(self.max_length, dtype=torch.long)
for i in range(word_length):
tensor[i] = LETTER_MAP[word[i]]

return tensor

构造数据集的参数是words, max_length, is_onehotwords是单词数组。max_length表示单词的最大长度。在训练时,我们一般要传入一个batch的单词。可是,单词有长有短,我们不可能拿一个动态长度的数组去表示单词。为了统一地表达所有单词,我们可以记录单词的最大长度,把较短的单词填充空字符,直到最大长度。is_onehot表示是不是one-hot编码,我设计的这个数据集既能输出用数字标签表示的单词(比如abc表示成[0, 1, 2]),也能输出one-hoe编码表示的单词(比如abc表示成[[1, 0, 0], [0, 1, 0], [0, 0, 1]])。

1
2
3
4
5
6
7
def __init__(self, words, max_length, is_onehot=True)
super().__init__()
n_words = len(words)
self.words = words
self.n_words = n_words
self.max_length = max_length
self.is_onehot = is_onehot

在获取数据集时,我们要根据是不是one-hot编码,先准备好一个全是0的输出张量。如果存的是one-hot编码,张量的形状是[MAX_LENGTH, EMBEDDING_LENGTH],第一维是单词的最大长度,第二维是one-hot编码的长度。而如果是普通的标签数组,则张量的形状是[MAX_LENGTH]。准备好张量后,遍历每一个位置,令one-hot编码的对应位为1,或者填入数字标签。

另外,我们用空格表示单词的结束。要在处理前给单词加一个' ',保证哪怕最长的单词也会至少有一个空格。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def __getitem__(self, index):
"""return the (one-hot) encoding vector of a word"""
word = self.words[index] + ' '
word_length = len(word)
if self.is_onehot:
tensor = torch.zeros(self.max_length, EMBEDDING_LENGTH)
for i in range(self.max_length):
if i < word_length:
tensor[i][LETTER_MAP[word[i]]] = 1
else:
tensor[i][0] = 1
else:
tensor = torch.zeros(self.max_length, dtype=torch.long)
for i in range(word_length):
tensor[i] = LETTER_MAP[word[i]]

return tensor

注意!短单词的填充部分应该全是空字符。千万不要忘记给空字符的one-hot编码赋值。

1
2
3
4
5
for i in range(self.max_length):
if i < word_length:
tensor[i][LETTER_MAP[word[i]]] = 1
else:
tensor[i][0] = 1

有了数据集类,结合之前写好的数据集获取函数,可以搭建一个DataLoader。DataLoader是PyTorch提供的数据读取类,它可以方便地从Dataset的子类里读取一个batch的数据,或者以更高级的方式取数据(比如随机取数据)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_dataloader_and_max_length(limit_length=None,
is_onehot=True,
is_vocab=True):
if is_vocab:
words = read_imdb_vocab()
else:
words = read_imdb_words(n_files=200)

max_length = 0
for word in words:
max_length = max(max_length, len(word))

if limit_length is not None and max_length > limit_length:
words = [w for w in words if len(w) <= limit_length]
max_length = limit_length

# for <EOS> (space)
max_length += 1

dataset = WordDataset(words, max_length, is_onehot)
return DataLoader(dataset, batch_size=256), max_length

这个函数会先调用之前编写的数据读取API获取单词数组。之后,函数会计算最长的单词长度。这里,我用limit_length过滤了过长的单词。据实验,这个数据集里最长的单词竟然有60多个字母,把短单词填充至60需要浪费大量的计算资源。因此,我设置了limit_length这个参数,不去读取那些过长的单词。

计算完最大长度后,别忘了+1,保证每个单词后面都有一个表示单词结束的空格。

最后,用DataLoader(dataset, batch_size=256)就可以得到一个DataLoader。batch_size就是指定batch size的参数。我们这个神经网络很小,输入数据也很小,可以选一个很大的batch size加速训练。

模型定义

模型的初始化函数和训练函数定义如下(dldemos/BasicRNN/models.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from dldemos.BasicRNN.constant import EMBEDDING_LENGTH, LETTER_LIST, LETTER_MAP

class RNN1(nn.Module):
def __init__(self, hidden_units=32):
super().__init__()
self.hidden_units = hidden_units
self.linear_a = nn.Linear(hidden_units + EMBEDDING_LENGTH,
hidden_units)
self.linear_y = nn.Linear(hidden_units, EMBEDDING_LENGTH)
self.tanh = nn.Tanh()

def forward(self, word: torch.Tensor):
# word shape: [batch, max_word_length, embedding_length]
batch, Tx = word.shape[0:2]

# word shape: [max_word_length, batch, embedding_length]
word = torch.transpose(word, 0, 1)

# output shape: [max_word_length, batch, embedding_length]
output = torch.empty_like(word)

a = torch.zeros(batch, self.hidden_units, device=word.device)
x = torch.zeros(batch, EMBEDDING_LENGTH, device=word.device)
for i in range(Tx):
next_a = self.tanh(self.linear_a(torch.cat((a, x), 1)))
hat_y = self.linear_y(next_a)
output[i] = hat_y
x = word[i]
a = next_a

# output shape: [batch, max_word_length, embedding_length]
return torch.transpose(output, 0, 1)

我们来一点一点地看看这个模型是怎么搭起来的。

回忆一下RNN的公式:

我们可以把第一行公式里的两个$W$合并一下,$x, a$拼接一下。这样,只需要两个线性层就可以描述RNN了。

因此,在初始化函数中,我们定义两个线性层linear_alinear_y。另外,hidden_units表示隐藏层linear_a的神经元数目。tanh就是普通的tanh函数,它用作第一层的激活函数。

linear_a就是公式的第一行,由于我们把输入x和状态a拼接起来了,这一层的输入通道数是hidden_units + EMBEDDING_LENGTH,输出通道数是hidden_units。第二层linear_y表示公式的第二行。我们希望RNN能预测下一个字母的出现概率,因此这一层的输出通道数是EMBEDDING_LENGTH=27,即字符个数。

1
2
3
4
5
6
7
8
def __init__(self, hidden_units=32):
super().__init__()
self.hidden_units = hidden_units
self.linear_a = nn.Linear(hidden_units + EMBEDDING_LENGTH,
hidden_units)
self.linear_y = nn.Linear(hidden_units, EMBEDDING_LENGTH)
self.tanh = nn.Tanh()

在描述模型运行的forward函数中,我们先准备好输出张量,再初始化好隐变量a和第一轮的输入x。根据公式,循环遍历序列的每一个字母,用a, x计算hat_y,并维护每一轮的a, x。最后,所有hat_y拼接成的output就是返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def forward(self, word: torch.Tensor):
# word shape: [batch, max_word_length, embedding_length]
batch, Tx = word.shape[0:2]

# word shape: [max_word_length, batch, embedding_length]
word = torch.transpose(word, 0, 1)

# output shape: [max_word_length, batch, embedding_length]
output = torch.empty_like(word)

a = torch.zeros(batch, self.hidden_units, device=word.device)
x = torch.zeros(batch, EMBEDDING_LENGTH, device=word.device)
for i in range(Tx):
next_a = self.tanh(self.linear_a(torch.cat((a, x), 1)))
hat_y = self.linear_y(next_a)
output[i] = hat_y
x = word[i]
a = next_a

# output shape: [batch, max_word_length, embedding_length]
return torch.transpose(output, 0, 1)

我们来看一看这个函数的细节。一开始,输入张量word的形状是[batch数,最大单词长度,字符数=27]。我们提前获取好形状信息。

1
2
# word shape: [batch, max_word_length, embedding_length]
batch, Tx = word.shape[0:2]

我们循环遍历的其实是单词长度那一维。为了方便理解代码,我们可以把单词长度那一维转置成第一维。根据这个新的形状,我们准备好同形状的输出张量。输出张量output[i][j]表示第j个batch的序列的第i个元素的27个字符预测结果。

1
2
3
4
5
# word shape: [max_word_length, batch,  embedding_length]
word = torch.transpose(word, 0, 1)

# output shape: [max_word_length, batch, embedding_length]
output = torch.empty_like(word)

按照前文知识准备的描述,第一轮的输入是空字符,期待的输出是句子里的第一个字母;第二轮的输入的第一个字母,期待的输出是第二个字母……。因此,我们要把输入x初始化为空。理论上x应该是一个空字符,其one-hot编码是[1, 0, 0, ...],但这里我们拿一个全0的向量表示句首也是可行的。除了初始化x,还要初始化一个全零隐变量a

1
2
a = torch.zeros(batch, self.hidden_units, device=word.device)
x = torch.zeros(batch, EMBEDDING_LENGTH, device=word.device)

之后,按照顺序遍历每一个元素,计算y_hat并维护a, x。最后输出结果前别忘了把转置过的维度复原回去。

1
2
3
4
5
6
7
8
9
for i in range(Tx):
next_a = self.tanh(self.linear_a(torch.cat((a, x), 1)))
hat_y = self.linear_y(next_a)
output[i] = hat_y
x = word[i]
a = next_a

# output shape: [batch, max_word_length, embedding_length]
return torch.transpose(output, 0, 1)

从逻辑上讲,模型应该输出softmax的结果。但是,PyTorch的CrossEntropyLoss已经包含了softmax的计算,我们不用在模型里加softmax。

训练

main函数中完整的训练代码如下(dldemos/BasicRNN/models.py):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def train_rnn1():
device = 'cuda:0'
dataloader, max_length = get_dataloader_and_max_length(19)

model = RNN1().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
citerion = torch.nn.CrossEntropyLoss()
for epoch in range(5):

loss_sum = 0
dataset_len = len(dataloader.dataset)

for y in dataloader:
y = y.to(device)
hat_y = model(y)
n, Tx, _ = hat_y.shape
hat_y = torch.reshape(hat_y, (n * Tx, -1))
y = torch.reshape(y, (n * Tx, -1))
label_y = torch.argmax(y, 1)
loss = citerion(hat_y, label_y)

optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()

loss_sum += loss

print(f'Epoch {epoch}. loss: {loss_sum / dataset_len}')

torch.save(model.state_dict(), 'dldemos/BasicRNN/rnn1.pth')
return model

首先,调用之前编写的函数,准备好dataloadermodel。同时,准备好优化器optimizer和损失函数citerion。优化器和损失函数按照常见配置选择即可。

1
2
3
4
5
6
7
device = 'cuda:0'
dataloader, max_length = get_dataloader_and_max_length(19)

model = RNN1().to(device)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
citerion = torch.nn.CrossEntropyLoss()

这个语言模型一下就能训练完,做5个epoch就差不多了。每一代训练中,
先调用模型求出hat_y,再调用损失函数citerion,最后反向传播并优化模型参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for epoch in range(5):
loss_sum = 0
dataset_len = len(dataloader.dataset)

for y in dataloader:
y = y.to(device)
hat_y = model(y)

n, Tx, _ = hat_y.shape
hat_y = torch.reshape(hat_y, (n * Tx, -1))
y = torch.reshape(y, (n * Tx, -1))
label_y = torch.argmax(y, 1)
loss = citerion(hat_y, label_y)

optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()

loss_sum += loss

print(f'Epoch {epoch}. loss: {loss_sum / dataset_len}')

算损失函数前需要预处理一下数据,交叉熵损失函数默认hat_y的维度是[batch数,类型数]label_y是一个一维整形标签数组。而模型的输出形状是[batch数,最大单词长度,字符数],我们要把前两个维度融合在一起。另外,我们并没有提前准备好label_y,需要调用argmax把one-hot编码转换回标签。

1
2
3
4
5
6
hat_y = model(y)
n, Tx, _ = hat_y.shape
hat_y = torch.reshape(hat_y, (n * Tx, -1))
y = torch.reshape(y, (n * Tx, -1))
label_y = torch.argmax(y, 1)
loss = citerion(hat_y, label_y)

之后就是调用PyTorch的自动求导功能。注意,为了防止RNN梯度过大,我们可以用clip_grad_norm_截取梯度的最大值。

1
2
3
4
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()

我还顺带输出了每一代的loss。当然这里我偷了个懒,这个loss并不能表示每一个样本的平均loss。不过,我们能通过这个loss得到模型的训练进度,这就够了。

1
print(f'Epoch {epoch}. loss: {loss_sum / dataset_len}')

测试

我们可以手动为字母级语言模型写几个测试用例,看看每一个单词的概率是否和期望的一样。我的测试单词列表是:

1
2
3
4
5
test_words = [
'apple', 'appll', 'appla', 'apply', 'bear', 'beer', 'berr', 'beee', 'car',
'cae', 'cat', 'cac', 'caq', 'query', 'queee', 'queue', 'queen', 'quest',
'quess', 'quees'
]

我构筑了几组长度一样,但是最后几个字母不太一样的“单词”。通过观察这些词的概率,我们能够验证语言模型的正确性。理论上来说,英文里的正确单词的概率会更高。

我们的模型只能输出每一个单词的softmax前结果。我们还要为模型另写一个求语言模型概率的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@torch.no_grad()
def language_model(self, word: torch.Tensor):
# word shape: [batch, max_word_length, embedding_length]
batch, Tx = word.shape[0:2]

# word shape: [max_word_length, batch, embedding_length]
# word_label shape: [max_word_length, batch]
word = torch.transpose(word, 0, 1)
word_label = torch.argmax(word, 2)

# output shape: [batch]
output = torch.ones(batch, device=word.device)

a = torch.zeros(batch, self.hidden_units, device=word.device)
x = torch.zeros(batch, EMBEDDING_LENGTH, device=word.device)
for i in range(Tx):
next_a = self.tanh(self.linear_a(torch.cat((a, x), 1)))
tmp = self.linear_y(next_a)
hat_y = F.softmax(tmp, 1)
probs = hat_y[torch.arange(batch), word_label[i]]
output *= probs
x = word[i]
a = next_a

return output

这个函数和forward大致相同。只不过,这次我们的输出output要表示每一个单词的概率。因此,它被初始化成一个全1的向量。

1
2
# output shape: [batch]
output = torch.ones(batch, device=word.device)

每轮算完最后一层的输出后,我们手动调用F.softmax得到softmax的概率值。

1
2
tmp = self.linear_y(next_a)
hat_y = F.softmax(tmp, 1)

接下来,我们要根据每一个batch当前位置的单词,去hat_y里取出需要的概率。比如第2个batch当前的字母是b,我们就要取出hat_y[2][2]

i轮所有batch的字母可以用word_label[i]表示。根据这个信息,我们可以用probs = hat_y[torch.arange(batch), word_label[i]]神奇地从hat_y里取出每一个batch里word_label[i]处的概率。把这个概率乘到output上就算完成了一轮计算。

有了语言模型函数,我们可以测试一下开始那些单词的概率。

1
2
3
4
5
6
7
8
9
10
def test_language_model(model, is_onehot=True, device='cuda:0'):
_, max_length = get_dataloader_and_max_length(19)
if is_onehot:
test_word = words_to_onehot(test_words, max_length)
else:
test_word = words_to_label_array(test_words, max_length)
test_word = test_word.to(device)
probs = model.language_model(test_word)
for word, prob in zip(test_words, probs):
print(f'{word}: {prob}')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
apple: 9.39846032110836e-08
appll: 6.516307937687316e-09
appla: 3.6599331565412285e-08
apply: 1.2422759709806996e-07
bear: 1.6009346381906653e-06
beer: 1.6936465954131563e-06
berr: 9.99331746243115e-07
beee: 1.5601625591443735e-07
car: 1.8536804418545216e-05
cae: 1.8946409454656532e-06
cat: 1.875695670605637e-05
cac: 6.04180786467623e-06
caq: 3.6483314147517376e-08
query: 1.6811516161396867e-06
queee: 5.9459132728534314e-08
queue: 9.488831942405795e-09
queen: 5.990783051856852e-07
quest: 2.737341446845676e-06
quess: 4.7091912165342364e-06
quees: 1.3468336419464322e-06

通过观察每一组用例,我们能发现,apple, apply, bear, beer这些正确的单词的概率确实会高一些。这个语言模型训练得不错。有趣的是,caq这种英语里几乎不存在的字母组合的概率也偏低。当然,语言模型对难一点的单词的判断就不太准了。queenqueue的出现概率就比较低。

采样单词

语言模型有一个很好玩的应用:我们可以根据语言模型输出的概率分布,采样出下一个单词;输入这一个单词,再采样下一个单词。这样一直采样,直到采样出空格为止。使用这种采样算法,我们能够让模型自动生成单词,甚至是英文里不存在,却看上去很像那么回事的单词。

我们要为模型编写一个新的方法sample_word,采样出一个最大长度为10的单词。这段代码的运行逻辑和之前的forward也很相似。只不过,这一次我们没有输入张量,每一轮的x要靠采样获得。np.random.choice(LETTER_LIST, p=np_prob)可以根据概率分布np_prob对列表LETTER_LIST进行采样。根据每一轮采样出的单词letter,我们重新生成一个x,给one-hot编码的对应位置赋值1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@torch.no_grad()
def sample_word(self, device='cuda:0'):
batch = 1
output = ''

a = torch.zeros(batch, self.hidden_units, device=device)
x = torch.zeros(batch, EMBEDDING_LENGTH, device=device)
for i in range(10):
next_a = self.tanh(self.linear_a(torch.cat((a, x), 1)))
tmp = self.linear_y(next_a)
hat_y = F.softmax(tmp, 1)

np_prob = hat_y[0].detach().cpu().numpy()
letter = np.random.choice(LETTER_LIST, p=np_prob)
output += letter

if letter == ' ':
break

x = torch.zeros(batch, EMBEDDING_LENGTH, device=device)
x[0][LETTER_MAP[letter]] = 1
a = next_a

return output

使用这个方法,我们可以写一个采样20次的脚本:

1
2
3
4
5
6
def sample(model):
words = []
for _ in range(20):
word = model.sample_word()
words.append(word)
print(*words)

我的一次输出是:

1
movine  oaceniefke xumedfasss tinkly  cerawedaus meblilesen douteni  ttingieftu sinsceered inelid  tniblicl  krouthyych mochonalos memp  dendusmani sttywima  dosmmek  dring  diummitt  pormoxthin

采样出来的单词几乎不会是英文里的正确单词。不过,这些单词的词缀很符合英文的造词规则,非常好玩。如果为采样函数加一些限制,比如只考虑概率前3的字母,那么算法应该能够采样出更正确的单词。

PyTorch里的RNN函数

刚刚我们手动编写了RNN的实现细节。实际上,PyTorch提供了更高级的函数,我们能够更加轻松地实现RNN。其他部分的代码逻辑都不怎么要改,我这里只展示一下要改动的关键部分。

写这份代码时我参考了 https://github.com/floydhub/word-language-model

新的模型的主要函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class RNN2(torch.nn.Module):
def __init__(self, hidden_units=64, embeding_dim=64, dropout_rate=0.2):
super().__init__()
self.drop = nn.Dropout(dropout_rate)
self.encoder = nn.Embedding(EMBEDDING_LENGTH, embeding_dim)
self.rnn = nn.GRU(embeding_dim, hidden_units, 1, batch_first=True)
self.decoder = torch.nn.Linear(hidden_units, EMBEDDING_LENGTH)
self.hidden_units = hidden_units

self.init_weights()

def init_weights(self):
initrange = 0.1
nn.init.uniform_(self.encoder.weight, -initrange, initrange)
nn.init.zeros_(self.decoder.bias)
nn.init.uniform_(self.decoder.weight, -initrange, initrange)

def forward(self, word: torch.Tensor):
# word shape: [batch, max_word_length]
batch, Tx = word.shape[0:2]
first_letter = word.new_zeros(batch, 1)
x = torch.cat((first_letter, word[:, 0:-1]), 1)
hidden = torch.zeros(1, batch, self.hidden_units, device=word.device)
emb = self.drop(self.encoder(x))
output, hidden = self.rnn(emb, hidden)
y = self.decoder(output.reshape(batch * Tx, -1))

return y.reshape(batch, Tx, -1)

初始化时,我们用nn.Embedding表示单词的向量。词嵌入(Embedding)是《深度学习专项-RNN》第二门课的内容,我会在下一篇笔记里介绍。这里我们把nn.Embedding看成一种代替one-hot编码的更高级的向量就行。这些向量和线性层参数W一样,是可以被梯度下降优化的。这样,不仅是RNN可以优化,每一个单词的表示方法也可以被优化。

注意,使用nn.Embedding后,输入的张量不再是one-hot编码,而是数字标签。代码中的其他地方也要跟着修改。

nn.GRU可以创建GRU。其第一个参数是输入的维度,第二个参数是隐变量a的维度,第三个参数是层数,这里我们只构建1层RNN,batch_first表示输入张量的格式是[batch, Tx, embedding_length]还是[Tx, batch, embedding_length]

貌似RNN中常用的正则化是靠dropout实现的。我们要提前准备好dropout层。

1
2
3
4
5
6
7
8
9
def __init__(self, hidden_units=64, embeding_dim=64, dropout_rate=0.2):
super().__init__()
self.drop = nn.Dropout(dropout_rate)
self.encoder = nn.Embedding(EMBEDDING_LENGTH, embeding_dim)
self.rnn = nn.GRU(embeding_dim, hidden_units, 1, batch_first=True)
self.decoder = torch.nn.Linear(hidden_units, EMBEDDING_LENGTH)
self.hidden_units = hidden_units

self.init_weights()

准备好了计算层后,在forward里只要依次调用它们就行了。其底层原理和我们之前手写的是一样的。其中,self.rnn(emb, hidden)这个调用完成了循环遍历的计算。

由于输入格式改了,令第一轮输入为空字符的操作也更繁琐了一点。我们要先定义一个空字符张量,再把它和输入的第一至倒数第二个元素拼接起来,作为网络的真正输入。

1
2
3
4
5
6
7
8
9
10
11
def forward(self, word: torch.Tensor):
# word shape: [batch, max_word_length]
batch, Tx = word.shape[0:2]
first_letter = word.new_zeros(batch, 1)
x = torch.cat((first_letter, word[:, 0:-1]), 1)
hidden = torch.zeros(1, batch, self.hidden_units, device=word.device)
emb = self.drop(self.encoder(x))
output, hidden = self.rnn(emb, hidden)
y = self.decoder(output.reshape(batch * Tx, -1))

return y.reshape(batch, Tx, -1)

PyTorch里的RNN用起来非常灵活。我们不仅能够给它一个序列,一次输出序列的所有结果,还可以只输入一个元素,得到一轮的结果。在采样单词时,我们不得不每次输入一个元素。有关采样的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@torch.no_grad()
def sample_word(self, device='cuda:0'):
batch = 1
output = ''

hidden = torch.zeros(1, batch, self.hidden_units, device=device)
x = torch.zeros(batch, 1, device=device, dtype=torch.long)
for _ in range(10):
emb = self.drop(self.encoder(x))
rnn_output, hidden = self.rnn(emb, hidden)
hat_y = self.decoder(rnn_output)
hat_y = F.softmax(hat_y, 2)

np_prob = hat_y[0, 0].detach().cpu().numpy()
letter = np.random.choice(LETTER_LIST, p=np_prob)
output += letter

if letter == ' ':
break

x = torch.zeros(batch, 1, device=device, dtype=torch.long)
x[0] = LETTER_MAP[letter]

return output

以上就是PyTorch高级RNN组件的使用方法。在使用PyTorch的RNN时,主要的改变就是输入从one-hot向量变成了标签,数据预处理会更加方便一些。另外,PyTorch的RNN会自动完成循环,可以给它输入任意长度的序列。

总结

在这篇文章中,我展示了一个字母级语言模型项目。这个项目涉及到的编程知识有:

  • one-hot编码的处理
  • RNN的底层实现
  • 如何用RNN对语言模型任务建模
  • 如何用RNN求出语言模型的概率
  • 如何对语言模型采样
  • PyTorch的RNN组件

这篇文章只展示了部分关键代码。想阅读整个项目完整的代码,可以访问该项目的GitHub链接

如果大家正在学深度学习,强烈建议大家从头写一遍这个项目。编写代码能够学到很多细节,加深对RNN的理解。

在编写这个项目时,我总结了项目中几个比较有挑战性的部分。大家阅读代码或自己动手时可以格外注意这些部分。第一个比较难的部分是和batch有关的计算。RNN本身必须得顺序处理序列,效率较低,同时处理一个batch的数据是一个很重要的加速手段。我们的代码都得尽量符合向量化编程要求,一次处理一个batch。

另外,相比一般的数据,序列数据多了一个时间维度(或者说序列维度),在向量化计算中考虑这个维度是很耗费脑力的。我们可以在代码中加入对中间变量形状的注释。在使用PyTorch或者其他框架时,要注意是batch维度在前面,还是时间维度在前面。注意初始化RNN的batch_first这个参数。还有,一个张量到底是one-hot编码,还是embedding,还是标签序列,这个也要想清楚来。

PyTorch里的CrossEntropyLoss自带了softmax操作,千万不能和softmax混用!我之前写了这个bug,调了很久才调出来,真是气死人了。

生成对抗网络(GAN)是一类非常有趣的神经网络。借助GAN,计算机能够生成逼真的图片。近年来有许多“AI绘画”的新闻,这些应用大多是通过GAN实现的。实际上,GAN不仅能做图像生成,还能辅助其他输入信息不足的视觉任务。比如SRGAN,就是把GAN应用在超分辨率(SR)任务上的代表之作。

在这篇文章中,我将主要面向深度学习的初学者,介绍SRGAN[1]这篇论文,同时分享以下知识:

  • GAN的原理与训练过程
  • 感知误差(Perceptual Loss)
  • 基于的GAN的SR模型框架

讲完了知识后,我还会解读一下MMEditing的SRGAN的训练代码。看懂这份代码能够加深对SRGAN训练算法的理解。

SRGAN 核心思想

早期超分辨率方法的优化目标都是降低低清图像和高清图像之间的均方误差。降低均方误差,确实让增强图像和原高清图像的相似度更高。但是,图像的相似度指标高并不能代表图像的增强质量就很高。下图显示了插值、优化均方误差、SRGAN、原图这四个图像输出结果(括号里的相似度指标是PSNR和SSIM)。

从图中可以看出,优化均方误差虽然能让相似度指标升高,但图像的细节十分模糊,尤其是纹理比较密集的高频区域。相比之下,SRGAN增强出来的图像虽然相似度不高,但看起来更加清晰。

为什么SRGAN的增强结果那么清楚呢?这是因为SRGAN使用了一套新的优化目标。SRGAN使用的损失函数既包括了GAN误差,也包括了感知误差。这套新的优化目标能够让网络生成看起来更清楚的图片,而不仅仅是和原高清图像相似度更高的图片。

下面,我们来一步一步学习SRGAN的框架。

GAN 的原理

GAN[2]是一套搭建神经网络的框架。给定一个图片数据集$p_g$,GAN的目的是训练出一个生成网络$G$,使得G能够凭空生成出和$p_g$中大多数图片都类似的图片。比如说$p_g$是一个小猫图片数据集,那么$G$就应该能凭空生成出小猫图片。当然,$G$不是真的没有任何输入,真的能够凭空生成一幅图片。为了生成出不一样的图片,$G$要求输入一个随机量,这个随机量叫做噪声$z$。这样,只要输入的噪声$z$变了,$G$的输出$G(z)$就变了,就能画出长相不一样的小猫了。

为了指导图像生成,$G$应该有一个“老师”告诉它该怎么画出更像的图片。这个“老师”叫做判别网络$D$。$D$就是一个二分类网络,它能够严格地判定出一幅图片是否来自数据集$p_g$。如果$p_g$是一个小猫数据集,那么$D$就应该能判定一张图片是不是小猫。这样,如果$G$生成出来的图片$G(z)$已经非常逼真,连$D$都觉得$G(z)$来自数据集$p_g$,那么$G$就是一个很成功的网络了。

如果只是生成小猫,我们直接拿小猫图片和其他图片就能训练出一个$D$了。问题是,大多数情况下我们只有数据集$p_g$,而难以获得一个$p_g$的反例数据集。GAN的想法,则巧妙地解决了这个问题:刚开始,$G$生成出来的图片肯定是很差的,这些图片肯定不像$p_g$。所以,我们以$G(z)$为反例,和$p_g$一起训练出一个$D$来。等$D$的判定能力强了以后,又拿$D$回头训练$G$。这样,$D$的审美水平逐渐提高,$G$的绘画能力也逐渐提高。最终,$D$能成功分辨出一幅图片是否来自$p_g$,而$G$生成出来的图片和$p_g$中的看起来完全相同,连$D$也分辨不出来。就这样,我们得到了一个很棒的生成网络$G$。

规范地来说,给定一个数据集$p_g$,我们希望训练出两个网络$D, G$。$D$能够判断一幅输入图片是否来自$p_g$:

$G$则能够根据来自噪声分布$p_z$的$z$生成一个真假难辨的图片$G(z)$,使得$D(G(z))=1$。

为了达到这个目标,二分类器$D$应该最小化这样一个的交叉熵误差:

其中,$\hat{y}=D(x)$是预测结果为真的概率,$y$是0或1的标签。

对于来自数据集的图片$x \sim p_g$,$D$使用的标签$y$应该是1,误差公式化简为:

对于$G$生成的图片$G(z)$,$D$使用的标签$y$应该是0,误差公式化简为:

我们每步拿一张真图$x$和一张假图$G(z)$训练$D$。这样,每步的误差公式就是上面两个式子加起来:

反过来,$G$应该和$D$对抗,最大化上面那个误差,想办法骗过$D$。这个“对抗”就是GAN的名称“生成对抗网络”的由来。但是,$G$不能改变$D(x)$那一项。因此,$G$使用的误差函数是:

使用上面这两种误差,就可以训练神经网络了。训练GAN时,每轮一般会训练$k(k>=1)$次$D$,再训练1次$G$。这是为了先得到一个好的判别器,再用判别器去指导生成器。

GAN只是一套通用的框架,并没有指定神经网络$D, G$的具体结构。在不同任务中,$D, G$一般有不同的结构。

基于GAN的超分辨率网络

如前文所述,以优化均方误差为目标的超分辨率模型难以复原图像的细节。其实,超分辨率任务和图像生成任务类似,都需要一个“老师”来指导优化目标。SRGAN把GAN框架运用到了超分辨率任务上。原来的生成器$G$随机生成图像,现在用来输出高清图像;原来的判定器$D$用来判定图像是否属于某数据集,现在$D$用来判断一幅图像是否是高清图像。

具体来说,相比基础的GAN,在SRGAN中,$D$的输入是高清图像$I^{HR}$。而$G$的输入从随机噪声$z$变成了高清图像退化后的低清图像$I^{LR}$。这样,$G$就不是在随机生成图像,而是在根据一幅低清图像生成一幅高清图像了。它们的误差函数分别是:

借助GAN的架构,SRGAN能够利用$D$指导高清图像生成。但是,超分辨率任务毕竟和图像生成任务有一些区别,不能只用这种对抗误差来约束网络。因此,除了使用对抗误差外,SRGAN还使用了一种内容误差。这种内容误差用于让低清图片和高清图片的内容对齐,起到了和原均方误差一样的作用。

基于感知的内容误差

在介绍SRGAN的内容误差之前,需要对“内容误差”和“感知误差”这两个名词做一个澄清。在SRGAN的原文章中,作者把内容误差和对抗误差之和叫做感知误差。但是,后续的大部分文献只把这种内容误差叫做感知误差,不会把内容误差和对抗误差放在一起称呼。在后文中,我也会用“感知误差”来指代SRGAN中的“内容误差”。

在深度卷积神经网络(CNN)火起来后,人们开始研究为什么CNN能够和人类一样识别出图像。经实验,人们发现两幅图像经VGG(一个经典的CNN)的某些中间层的输出越相似,两幅图像从观感上也越相似。这种相似度并不是基于某种数学指标,而是和人的感知非常类似。

VGG的这种“感知性”被运用在了风格迁移等任务上。也有人考虑把这种感知上的误差运用到超分辨率任务上,并取得了不错的结果[3]。下图是真值、插值、基于逐像素误差、基于感知误差的四个超分辨率结果。

SRGAN也使用了这种感知误差,以取代之前常常使用的逐像素均方误差。这种感知误差的计算方法如下:VGG有很多中间层,用于计算感知误差的中间层$i$是可调的。假如我们用$\phi_{i}(I)$表示图像$I$经VGG的第$i$层的中间输出结果,$\phi_{i}(I)_{x, y}$表示中间输出结果在坐标$(x, y)$处的值,则感知误差的公式如下:

直观上解释这个公式,就是先把高清图像$I^{HR}$送入VGG,再把高清图像退化出来的低清图像$I^{LR}$送入生成器,并把生成器的输出$G(I^{LR})$也送入VGG。两幅图片经VGG第$i$层生成的中间结果的逐像素均方误差,就是感知误差。

算上之前的对抗误差,一个图像超分辨率网络的总误差如下:

这里的$w$用于调整两个误差的相对权重,原论文使用$w=10^{-3}$。

SRGAN的其他模块

定义好了误差函数,只要在决定好网络结构就可以开始训练网络了。SRGAN使用的生成网络和判别网络的结构如下:

判别网络就是一个平平无奇的二分类网络,架构上没有什么创新。而生成网络则先用几个残差块提取特征,最后用一种超分辨率任务中常用的上采样模块PixelShuffle对原图像的尺寸翻倍两次,最后输出一个边长放大4倍的高清图像。

SRGAN的这种网络结构在当时确实取得了不错的结果。但是,很快就有后续研究提出了更好的网络架构。比如ESRGAN[4]去掉了生成网络的BN层,提出了一种叫做RRDB的高级模块。基于RRDB的生成网络有着更好的生成效果。

不仅是网络架构,SRGAN的其他细节也得到了后续研究的改进。GAN误差的公式、总误差的公式、高清图像退化成低清图像的数据增强算法……这些子模块都被后续研究改进了。但是,SRGAN这种基于GAN的训练架构一直没有发生改变。有了SRGAN的代码,想复现一些更新的超分辨率网络时,往往只需要换一下生成器的结构,或者改一改误差的公式就行了。大部分的训练代码是不用改变的。

总结

SRGAN是把GAN运用在超分辨率任务上的开山之作。如正文所述,SRGAN中的部分设计虽然已经过时,但它的整体训练架构被一直沿用了下来。现在去回顾SRGAN这篇论文时,只需要关注以下几点即可:

  • 如何把GAN套用在超分辨率任务上
  • GAN误差
  • 感知误差

通过阅读这篇论文,我们不仅应该学会GAN是怎样运用在SR上的,也应该能总结出如何把GAN应用在其他任务上。GAN的本质是去学习一个分布,令生成的$G(z)$看上去是来自分布$p_g$,而不是像图像分类等任务去学习一个$x \to y$的映射关系。因此,GAN会记忆一些和数据集相关的信息。在输入信息就已经比较完备的图像分类、目标检测等任务中,GAN可能没有什么用武之地。但是,在输入信息不足的超分辨率、图像补全等任务中,GAN记忆的数据集信息有很有用了。很多时候,GAN会“脑补”出输入图像中不够清楚的部分。

决定了要在某个任务中使用GAN时,我们可以在一个不使用GAN的架构上做以下改动:

  • 定义一个分类网络$D$。
  • 在原loss中加一项由$D$算出来的GAN loss。
  • 在训练流程中,加入训练$D$的逻辑。

看完正文后,如果你对GAN在SR上的训练逻辑还是不太清楚,欢迎阅读附录中有关SRGAN训练代码的解读。

附录:MMEditing 中的 SRGAN

MMEditing中的SRGAN写在mmedit/models/restorers/srgan.py这个文件里。学习训练逻辑时,我们只需要关注SRGAN类的train_step方法即可。

以下是train_step的源代码(我的mmedit版本是v0.15.1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def train_step(self, data_batch, optimizer):
"""Train step.

Args:
data_batch (dict): A batch of data.
optimizer (obj): Optimizer.

Returns:
dict: Returned output.
"""
# data
lq = data_batch['lq']
gt = data_batch['gt']

# generator
fake_g_output = self.generator(lq)

losses = dict()
log_vars = dict()

# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

# discriminator
set_requires_grad(self.discriminator, True)
# real
real_d_pred = self.discriminator(gt)
loss_d_real = self.gan_loss(
real_d_pred, target_is_real=True, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_real=loss_d_real))
optimizer['discriminator'].zero_grad()
loss_d.backward()
log_vars.update(log_vars_d)
# fake
fake_d_pred = self.discriminator(fake_g_output.detach())
loss_d_fake = self.gan_loss(
fake_d_pred, target_is_real=False, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_fake=loss_d_fake))
loss_d.backward()
log_vars.update(log_vars_d)

optimizer['discriminator'].step()

self.step_counter += 1

log_vars.pop('loss') # remove the unnecessary 'loss'
outputs = dict(
log_vars=log_vars,
num_samples=len(gt.data),
results=dict(lq=lq.cpu(), gt=gt.cpu(), output=fake_g_output.cpu()))

return outputs

一开始,图像输出都在词典data_batch里。函数先把低清图lq和高清的真值gt从词典里取出。

1
2
3
# data
lq = data_batch['lq']
gt = data_batch['gt']

之后,函数计算了$G(I^{lq})$,为后续loss的计算做准备。

1
2
# generator
fake_g_output = self.generator(lq)

接下来,是优化生成器self.generator的逻辑。这里面有一些函数调用,我们可以不管它们的实现,大概理解整段代码的意思就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
losses = dict()
log_vars = dict()

# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

为了只训练生成器,要用下面的代码关闭判别器的训练。

1
2
# no updates to discriminator parameters.
set_requires_grad(self.discriminator, False)

正文说过,训练GAN时一般要先训好判别器,且训练判别器多于训练生成器。因此,下面的if语句可以让判别器训练了self.disc_init_steps步后,每训练self.disc_steps步判别器再训练一步生成器。

1
2
if (self.step_counter % self.disc_steps == 0
and self.step_counter >= self.disc_init_steps):

if语句块里分别计算了逐像素误差(比如均方误差和L1误差)、感知误差、GAN误差。虽然SRGAN完全抛弃了逐像素误差,但实际训练时我们还是可以按一定比例加上这个误差。这些误差最后会用于训练生成器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if self.pixel_loss:
losses['loss_pix'] = self.pixel_loss(fake_g_output, gt)
if self.perceptual_loss:
loss_percep, loss_style = self.perceptual_loss(
fake_g_output, gt)
if loss_percep is not None:
losses['loss_perceptual'] = loss_percep
if loss_style is not None:
losses['loss_style'] = loss_style
# gan loss for generator
fake_g_pred = self.discriminator(fake_g_output)
losses['loss_gan'] = self.gan_loss(
fake_g_pred, target_is_real=True, is_disc=False)

# parse loss
loss_g, log_vars_g = self.parse_losses(losses)
log_vars.update(log_vars_g)

# optimize
optimizer['generator'].zero_grad()
loss_g.backward()
optimizer['generator'].step()

训练完生成器后,要训练判别器。和生成器的误差计算方法类似,判别器的训练代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 # discriminator
set_requires_grad(self.discriminator, True)
# real
real_d_pred = self.discriminator(gt)
loss_d_real = self.gan_loss(
real_d_pred, target_is_real=True, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_real=loss_d_real))
optimizer['discriminator'].zero_grad()
loss_d.backward()
log_vars.update(log_vars_d)
# fake
fake_d_pred = self.discriminator(fake_g_output.detach())
loss_d_fake = self.gan_loss(
fake_d_pred, target_is_real=False, is_disc=True)
loss_d, log_vars_d = self.parse_losses(dict(loss_d_fake=loss_d_fake))
loss_d.backward()
log_vars.update(log_vars_d)

optimizer['discriminator'].step()

这段代码有两个重点:

  1. 在训练判别器时,要用set_requires_grad(self.discriminator, True)开启判别器的梯度计算。
  2. fake_d_pred = self.discriminator(fake_g_output.detach())这一行的detach()很关键。detach()可以中断某张量的梯度跟踪。fake_g_output是由生成器算出来的,如果不把这个张量的梯度跟踪切断掉,在优化判别器时生成器的参数也会跟着优化。

函数的最后部分是一些和MMEditing其他代码逻辑的交互,和SRGAN本身没什么关联。

1
2
3
4
5
6
7
8
9
self.step_counter += 1

log_vars.pop('loss') # remove the unnecessary 'loss'
outputs = dict(
log_vars=log_vars,
num_samples=len(gt.data),
results=dict(lq=lq.cpu(), gt=gt.cpu(), output=fake_g_output.cpu()))

return outputs

只要理解了本文的误差计算公式,再看懂了这段代码是如何训练判别器和生成器的,就算是完全理解了SRGAN的核心思想了。

参考资料

[1] (SRGAN): Photo-Realistic Single Image Super-Resolution Using a Generative Adversarial Network

[2] (GAN): Generative Adversarial Nets

[3] (Perceptual Loss):Perceptual Losses for Real-Time Style Transfer and Super-Resolution

[4] (ESRGAN): ESRGAN: Enhanced Super-Resolution Generative Adversarial Networks

我之前的一篇文章介绍了如何给PyTorch添加CPU上的简单的加法算子。在这篇文章里,我将继续展示一个更具体的PyTorch自定义算子示例——自己动手复现二维卷积算子。这个示例是基于PyTorch Extension的,在迁移项目时,不需要自己生成动态库,只需要用setup.py重新编译一遍即可。我会同时介绍CPU版和CUDA版的实现。

许多前沿的神经网络都会对卷积进行一些修改。比如大名鼎鼎的可变形卷积(deformable convolution)。相信看完这篇文章后,大家能看懂PyTorch卷积的实现代码,并大概了解如何修改卷积的实现细节,并把新写好的卷积运用到自己的PyTorch项目中。

PyTorch Extension 实现二维卷积

搭建项目

在开始写代码前,要准备一个崭新的目录,在这个文件夹里搭建项目。

在根目录下,先创建一个setup.py,之后要填写这份安装文件。

之后,创建一个文件夹,其名字是项目名。在这个文件夹里合适的地方新建一个子文件夹,专门用来放和算子相关的文件。我的项目名叫做panoflow,算子相关文件放在了panoflow/core/op子文件夹下。

接下来,和算子实现相关的文件都应该放在算子文件夹里。使用和测试算子的文件可以放在项目文件夹的其他地方。

由于在实现中我借用了MMCV的代码,还要提前准备好一些头文件。首先新建一个文件pytorch_cpp_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#ifndef PYTORCH_CPP_HELPER
#define PYTORCH_CPP_HELPER
#include <torch/extension.h>

#include <vector>

using namespace at;

#define CHECK_CUDA(x) \
TORCH_CHECK(x.device().is_cuda(), #x " must be a CUDA tensor")
#define CHECK_CPU(x) \
TORCH_CHECK(!x.device().is_cuda(), #x " must be a CPU tensor")
#define CHECK_CONTIGUOUS(x) \
TORCH_CHECK(x.is_contiguous(), #x " must be contiguous")
#define CHECK_CUDA_INPUT(x) \
CHECK_CUDA(x); \
CHECK_CONTIGUOUS(x)
#define CHECK_CPU_INPUT(x) \
CHECK_CPU(x); \
CHECK_CONTIGUOUS(x)

#endif // PYTORCH_CPP_HELPER

再创建一个文件pytorch_cuda_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#ifndef PYTORCH_CUDA_HELPER
#define PYTORCH_CUDA_HELPER

#include <ATen/ATen.h>
#include <ATen/cuda/CUDAContext.h>
#include <c10/cuda/CUDAGuard.h>

#include <ATen/cuda/CUDAApplyUtils.cuh>
#include <THC/THCAtomics.cuh>

#include "common_cuda_helper.hpp"

using at::Half;
using at::Tensor;
using phalf = at::Half;

#define __PHALF(x) (x)

#endif // PYTORCH_CUDA_HELPER

还有一个common_cuda_helper.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef COMMON_CUDA_HELPER
#define COMMON_CUDA_HELPER

#include <cuda.h>

#define CUDA_1D_KERNEL_LOOP(i, n) \
for (int i = blockIdx.x * blockDim.x + threadIdx.x; i < (n); \
i += blockDim.x * gridDim.x)

#define CUDA_2D_KERNEL_LOOP(i, n, j, m) \
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; i < (n); \
i += blockDim.x * gridDim.x) \
for (size_t j = blockIdx.y * blockDim.y + threadIdx.y; j < (m); \
j += blockDim.y * gridDim.y)

#define CUDA_2D_KERNEL_BLOCK_LOOP(i, n, j, m) \
for (size_t i = blockIdx.x; i < (n); i += gridDim.x) \
for (size_t j = blockIdx.y; j < (m); j += gridDim.y)

#define THREADS_PER_BLOCK 512

inline int GET_BLOCKS(const int N, const int num_threads = THREADS_PER_BLOCK) {
int optimal_block_num = (N + num_threads - 1) / num_threads;
int max_block_num = 4096;
return min(optimal_block_num, max_block_num);
}

template <typename T>
__device__ T bilinear_interpolate(const T* input, const int height,
const int width, T y, T x,
const int index /* index for debug only*/) {
// deal with cases that inverse elements are out of feature map boundary
if (y < -1.0 || y > height || x < -1.0 || x > width) return 0;

if (y <= 0) y = 0;
if (x <= 0) x = 0;

int y_low = (int)y;
int x_low = (int)x;
int y_high;
int x_high;

if (y_low >= height - 1) {
y_high = y_low = height - 1;
y = (T)y_low;
} else {
y_high = y_low + 1;
}

if (x_low >= width - 1) {
x_high = x_low = width - 1;
x = (T)x_low;
} else {
x_high = x_low + 1;
}

T ly = y - y_low;
T lx = x - x_low;
T hy = 1. - ly, hx = 1. - lx;
// do bilinear interpolation
T v1 = input[y_low * width + x_low];
T v2 = input[y_low * width + x_high];
T v3 = input[y_high * width + x_low];
T v4 = input[y_high * width + x_high];
T w1 = hy * hx, w2 = hy * lx, w3 = ly * hx, w4 = ly * lx;

T val = (w1 * v1 + w2 * v2 + w3 * v3 + w4 * v4);

return val;
}

template <typename T>
__device__ void bilinear_interpolate_gradient(
const int height, const int width, T y, T x, T& w1, T& w2, T& w3, T& w4,
int& x_low, int& x_high, int& y_low, int& y_high,
const int index /* index for debug only*/) {
// deal with cases that inverse elements are out of feature map boundary
if (y < -1.0 || y > height || x < -1.0 || x > width) {
// empty
w1 = w2 = w3 = w4 = 0.;
x_low = x_high = y_low = y_high = -1;
return;
}

if (y <= 0) y = 0;
if (x <= 0) x = 0;

y_low = (int)y;
x_low = (int)x;

if (y_low >= height - 1) {
y_high = y_low = height - 1;
y = (T)y_low;
} else {
y_high = y_low + 1;
}

if (x_low >= width - 1) {
x_high = x_low = width - 1;
x = (T)x_low;
} else {
x_high = x_low + 1;
}

T ly = y - y_low;
T lx = x - x_low;
T hy = 1. - ly, hx = 1. - lx;

// reference in forward
// T v1 = input[y_low * width + x_low];
// T v2 = input[y_low * width + x_high];
// T v3 = input[y_high * width + x_low];
// T v4 = input[y_high * width + x_high];
// T val = (w1 * v1 + w2 * v2 + w3 * v3 + w4 * v4);

w1 = hy * hx, w2 = hy * lx, w3 = ly * hx, w4 = ly * lx;

return;
}
#endif // COMMON_CUDA_HELPER

这些文件添加了CPU和CUDA实现时需要的头文件和定义,后面的C++源码会用到它们。

CPU

C++实现

在用C++实现一个算子时,我们要编写一个形如这样的文件:

1
2
3
4
5
6
7
8
9
10
11
#include <torch/torch.h>

torch::Tensor my_add(torch::Tensor t1, torch::Tensor t2)
{
return t1 + t2;
}

TORCH_LIBRARY(my_ops, m)
{
m.def("my_add", my_add);
}

这个C++文件主要包含两部分内容:算子的实现函数和C++接口绑定。在实现卷积时,也是要实现这两部分内容。

在修改一个现有的算子时,最好的方法不是从头写一个,而是去开源库里找一份实现,并在这个基础上进行修改。

我在MMCV的仓库里找到了可变形卷积的实现,并把它拆解回了普通的卷积。我参考了这篇教程:手把手教你如何高效地在 MMCV 中贡献算子。另外,这份笔记还参考了PyTorch官方Extension教程

找到了卷积的实现后,在算子文件夹下新建一个cpp源文件。比如我的文件路径就是panoflow/core/op/my_conv.cpp。这样一个普通卷积的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#include "pytorch_cpp_helper.hpp"

void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col);

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col);

void my_conv_shape_check(at::Tensor input,
at::Tensor weight, int kH,
int kW, int dH, int dW, int padH, int padW,
int dilationH, int dilationW, int group)
{
TORCH_CHECK(
weight.ndimension() == 4,
"4D weight tensor (nOutputPlane,nInputPlane,kH,kW) expected, but got: %s",
weight.ndimension());

TORCH_CHECK(weight.is_contiguous(), "weight tensor has to be contiguous");

TORCH_CHECK(kW > 0 && kH > 0,
"kernel size should be greater than zero, but got kH: %d kW: %d",
kH, kW);

TORCH_CHECK((weight.size(2) == kH && weight.size(3) == kW),
"kernel size should be consistent with weight, ",
"but got kH: %d kW: %d weight.size(2): %d, weight.size(3): %d",
kH, kW, weight.size(2), weight.size(3));

TORCH_CHECK(dW > 0 && dH > 0,
"stride should be greater than zero, but got dH: %d dW: %d", dH,
dW);

TORCH_CHECK(
dilationW > 0 && dilationH > 0,
"dilation should be greater than 0, but got dilationH: %d dilationW: %d",
dilationH, dilationW);

int ndim = input.ndimension();
int dimf = 0;
int dimh = 1;
int dimw = 2;

if (ndim == 4)
{
dimf++;
dimh++;
dimw++;
}

TORCH_CHECK(ndim == 3 || ndim == 4,
"3D or 4D input tensor expected but got: %s", ndim);

long nInputPlane = weight.size(1) * group;
long inputHeight = input.size(dimh);
long inputWidth = input.size(dimw);
long nOutputPlane = weight.size(0);
long outputHeight =
(inputHeight + 2 * padH - (dilationH * (kH - 1) + 1)) / dH + 1;
long outputWidth =
(inputWidth + 2 * padW - (dilationW * (kW - 1) + 1)) / dW + 1;

if (outputWidth < 1 || outputHeight < 1)
AT_ERROR(
"Given input size: (%ld x %ld x %ld). "
"Calculated output size: (%ld x %ld x %ld). Output size is too small",
nInputPlane, inputHeight, inputWidth, nOutputPlane, outputHeight,
outputWidth);

TORCH_CHECK(input.size(1) == nInputPlane,
"invalid number of input planes, expected: %d, but got: %d",
nInputPlane, input.size(1));

TORCH_CHECK((inputHeight >= kH && inputWidth >= kW),
"input image is smaller than kernel");
}

void my_conv_forward(Tensor input, Tensor weight, Tensor bias,
Tensor output, Tensor columns, int kW,
int kH, int dW, int dH, int padW, int padH,
int dilationW, int dilationH, int group,
int im2col_step)
{
bool isCuda = false;
if (input.device().is_cuda())
{
CHECK_CUDA_INPUT(input);
CHECK_CUDA_INPUT(weight);
CHECK_CUDA_INPUT(bias);
CHECK_CUDA_INPUT(output);
CHECK_CUDA_INPUT(columns);
isCuda = true;
}
else
{
CHECK_CPU_INPUT(input);
CHECK_CPU_INPUT(weight);
CHECK_CPU_INPUT(bias);
CHECK_CPU_INPUT(output);
CHECK_CPU_INPUT(columns);
}

my_conv_shape_check(input, weight, kH, kW, dH, dW, padH,
padW, dilationH, dilationW, group);
at::DeviceGuard guard(input.device());

int batch = 1;
if (input.ndimension() == 3)
{
// Force batch
batch = 0;
input.unsqueeze_(0);
}

long batchSize = input.size(0);
long nInputPlane = input.size(1);
long inputHeight = input.size(2);
long inputWidth = input.size(3);

long nOutputPlane = weight.size(0);

long outputWidth =
(inputWidth + 2 * padW - (dilationW * (kW - 1) + 1)) / dW + 1;
long outputHeight =
(inputHeight + 2 * padH - (dilationH * (kH - 1) + 1)) / dH + 1;

output = output.view({batchSize / im2col_step, im2col_step, nOutputPlane,
outputHeight, outputWidth});
columns = at::zeros(
{nInputPlane * kW * kH, im2col_step * outputHeight * outputWidth},
input.options());

input = input.view({batchSize / im2col_step, im2col_step, nInputPlane,
inputHeight, inputWidth});

Tensor output_buffer = at::zeros({batchSize / im2col_step, nOutputPlane,
im2col_step * outputHeight, outputWidth},
output.options());

output_buffer = output_buffer.view(
{output_buffer.size(0), group, output_buffer.size(1) / group,
output_buffer.size(2), output_buffer.size(3)});

for (int elt = 0; elt < batchSize / im2col_step; elt++)
{
if (isCuda)
{
my_conv_im2col_cuda(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}
else
{
my_conv_im2col_cpu(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}


columns = columns.view({group, columns.size(0) / group, columns.size(1)});
weight = weight.view({group, weight.size(0) / group, weight.size(1),
weight.size(2), weight.size(3)});

for (int g = 0; g < group; g++)
{
output_buffer[elt][g] = output_buffer[elt][g]
.flatten(1)
.addmm_(weight[g].flatten(1), columns[g])
.view_as(output_buffer[elt][g]);
}
columns =
columns.view({columns.size(0) * columns.size(1), columns.size(2)});
weight = weight.view({weight.size(0) * weight.size(1), weight.size(2),
weight.size(3), weight.size(4)});
}

output_buffer = output_buffer.view(
{output_buffer.size(0), output_buffer.size(1) * output_buffer.size(2),
output_buffer.size(3), output_buffer.size(4)});

output_buffer = output_buffer.view({batchSize / im2col_step, nOutputPlane,
im2col_step, outputHeight, outputWidth});
output_buffer.transpose_(1, 2);

output.copy_(output_buffer);
output = output.view({batchSize, nOutputPlane, outputHeight, outputWidth});

bias = bias.view({1, bias.size(0), 1, 1});
output.add_(bias);

input = input.view({batchSize, nInputPlane, inputHeight, inputWidth});

if (batch == 0)
{
output = output.view({nOutputPlane, outputHeight, outputWidth});
input = input.view({nInputPlane, inputHeight, inputWidth});
}
}

template <typename T>
void my_conv_im2col_cpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
for (int index = 0; index < n; index++)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "", [&]
{ my_conv_im2col_cpu_kernel<scalar_t>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });
}

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{

}

PYBIND11_MODULE(my_ops, m)
{
m.def("my_conv_forward", my_conv_forward, "my_conv_forward",
py::arg("input"), py::arg("weight"), py::arg("bias"),
py::arg("output"), py::arg("columns"), py::arg("kW"),
py::arg("kH"), py::arg("dW"), py::arg("dH"), py::arg("padW"),
py::arg("padH"), py::arg("dilationW"), py::arg("dilationH"),
py::arg("group"), py::arg("im2col_step"));
}

这份实现非常长,我挑一些重点的内容讲解。

从最下面的PYBIND11_MODULE(my_ops, m)看起。这里的my_ops是生成的库名,可以随便取名。待会要import这个库名。代码块里m.def用于定义C++函数的Python接口。"my_conv_forward"是Python调用时的函数名称,my_conv_forward是被Python代码调用的这份代码里的C++函数名称。也就是说,这份卷积实现的入口函数就是my_conv_forward。我们从这个函数看起。

1
2
3
4
5
void my_conv_forward(Tensor input, Tensor weight, Tensor bias,
Tensor output, Tensor columns, int kW,
int kH, int dW, int dH, int padW, int padH,
int dilationW, int dilationH, int group,
int im2col_step)

my_conv_forward就是卷积的主函数。它的参数除了PyTorch的Conv2d传入的参数外,还多了两个参数output, columus。这两个张量是保存中间结果的,在PyTorch侧是看不到的。output用于保存卷积输出,columns用于保存卷积时的列矩阵。底层实现卷积时,会先把图像转换成一个用列表示的矩阵,再把卷积操作当成一个矩阵乘法来完成。其中,第一步操作叫做”im2col”。对此原理不熟的话可以参考这篇文章:https://zhuanlan.zhihu.com/p/63974249。

my_conv_forward函数的大部分内容都是在做类型检查和张量形状转换。在修改卷积实现时,这些东西都可以不用改。整个卷积操作的核心都在这一部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for (int elt = 0; elt < batchSize / im2col_step; elt++)
{
if (isCuda)
{
my_conv_im2col_cuda(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}
else
{
my_conv_im2col_cpu(input[elt], nInputPlane, inputHeight,
inputWidth, kH, kW, padH, padW, dH, dW, dilationH,
dilationW, im2col_step, columns);
}


columns = columns.view({group, columns.size(0) / group, columns.size(1)});
weight = weight.view({group, weight.size(0) / group, weight.size(1),
weight.size(2), weight.size(3)});

for (int g = 0; g < group; g++)
{
output_buffer[elt][g] = output_buffer[elt][g]
.flatten(1)
.addmm_(weight[g].flatten(1), columns[g])
.view_as(output_buffer[elt][g]);
}
columns =
columns.view({columns.size(0) * columns.size(1), columns.size(2)});
weight = weight.view({weight.size(0) * weight.size(1), weight.size(2),
weight.size(3), weight.size(4)});
}

这段代码先做了im2col操作,再做了矩阵乘法。其实,包括可变形卷积在内,各种稀奇古怪的卷积操作通过靠修改im2col来完成的。CPU和CUDA版卷积的主要区别,也体现在im2col中(后面的矩阵乘法在CPU和CUDA上都能用)。

由于是讲CPU实现,这里的CUDA实现我暂时放了一个空函数。my_conv_im2col_cpu的内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void my_conv_im2col_cpu(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "", [&]
{ my_conv_im2col_cpu_kernel<scalar_t>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });
}

这个函数其实只是处理了一下输入,真正的实现在my_conv_im2col_cpu_kernel里。AT_DISPATCH_FLOATING_TYPES_AND_HALF可以让实现兼容半精度和普通float,所以实现my_conv_im2col_cpu_kernel得写成一个模板函数。

my_conv_im2col_cpu_kernel的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template <typename T>
void my_conv_im2col_cpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
for (int index = 0; index < n; index++)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

它的作用就是把图像里的数据搬到做卷积运算的column里。循环遍历每一次卷积的每一个位置,把待运算的量填入column。卷积里的所有参数(pad, stride, …)都是在这段函数里生效的。想实现可变形卷积等改进,也要修改这个函数。

Python封装

实现好了后,如果编译完了的话,刚刚的卷积接口可以通过以下方式在Python里调用:

1
2
import my_ops
my_ops.my_conv_forward(...)

这里的my_ops这个名称必须和开始PYBIND11_MODULE(my_ops, m)里面那个库名称对应。

基于这个接口,可以仿照PyTorch中Conv2d的接口,编写一个和Conv2d等价的torch.nn.Module出来。我的这个Python文件的路径是panoflow/core/op/my_conv.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import torch
from torch.autograd import Function
import torch.nn as nn
from torch import Tensor
from torch.nn.modules.utils import _pair
from torch.nn.parameter import Parameter

import my_ops


class MyConvF(Function):

@staticmethod
def forward(ctx,
input: torch.Tensor,
weight,
bias,
stride=1,
padding=0,
dilation=1,
groups=1,
im2col_step=32):
if input is not None and input.dim() != 4:
raise ValueError(
f'Expected 4D tensor as input, got {input.dim()}D tensor \
instead.')
ctx.stride = _pair(stride)
ctx.padding = _pair(padding)
ctx.dilation = _pair(dilation)
ctx.groups = groups
ctx.im2col_step = im2col_step

weight = weight.type_as(input)
ctx.save_for_backward(input, weight)

output = input.new_empty(MyConvF._output_size(ctx, input, weight))

ctx.bufs_ = [input.new_empty(0), input.new_empty(0)] # columns, ones

cur_im2col_step = min(ctx.im2col_step, input.size(0))
assert (input.size(0) % cur_im2col_step
) == 0, 'batch size must be divisible by im2col_step'

my_ops.my_conv_forward(
input,
weight,
bias,
output,
ctx.bufs_[0],
kW=weight.size(3),
kH=weight.size(2),
dW=ctx.stride[1],
dH=ctx.stride[0],
padW=ctx.padding[1],
padH=ctx.padding[0],
dilationW=ctx.dilation[1],
dilationH=ctx.dilation[0],
group=ctx.groups,
im2col_step=cur_im2col_step)
return output

@staticmethod
def _output_size(ctx, input, weight):
channels = weight.size(0)
output_size = (input.size(0), channels)
for d in range(input.dim() - 2):
in_size = input.size(d + 2)
pad = ctx.padding[d]
kernel = ctx.dilation[d] * (weight.size(d + 2) - 1) + 1
stride_ = ctx.stride[d]
output_size += ((in_size + (2 * pad) - kernel) // stride_ + 1, )
if not all(map(lambda s: s > 0, output_size)):
raise ValueError(
'convolution input is too small (output would be ' +
'x'.join(map(str, output_size)) + ')')
return output_size


my_conv = MyConvF.apply


class MyConv2d(nn.Module):

def __init__(self,
in_channels: int,
out_channels: int,
kernel_size,
stride=1,
padding=0,
dilation=1,
groups: int = 1,
bias: bool = True):
super().__init__()
kernel_size_ = _pair(kernel_size)
stride_ = _pair(stride)
padding_ = _pair(padding)
dilation_ = _pair(dilation)
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size_
self.stride = stride_
self.padding = padding_
self.dilation = dilation_
self.groups = groups
self.weight = Parameter(
torch.Tensor(out_channels, in_channels // groups, *kernel_size_))
self.bias = Parameter(torch.Tensor(out_channels))

# Useless attributes
self.transposed = None
self.output_padding = None
self.padding_mode = None

def forward(self, input: Tensor) -> Tensor:
return my_conv(input, self.weight, self.bias, self.stride,
self.padding, self.dilation, self.groups)

以后,用自己的卷积MyConv2d就和用普通的Conv2d一样了。

编译

打开外面的setup.py,填写以下内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from setuptools import setup
from torch.utils import cpp_extension
import os

src_root = 'panoflow/core/op'
cpp_src = ['my_conv.cpp']

if __name__ == '__main__':
include_dirs = ['panoflow/core/op']
cpp_path = [os.path.join(src_root, src) for src in cpp_src]

setup(
name='panoflow',
ext_modules=[
cpp_extension.CppExtension(
'my_ops', cpp_path, include_dirs=include_dirs)
],
cmdclass={'build_ext': cpp_extension.BuildExtension})

其中的路径要根据自己的实际情况修改。

和编译相关的内容都写在cpp_extension.CppExtension里。其中,源文件要写在第二个参数里,头文件目录要写在include_dirs。由于我的源文件放在panoflow/core/op里,我写了个源文件名数组cpp_src,在传参前把路径组合了一下。由于include_dirs和源文件在同一个目录下,我也填的是panoflow/core/op

写完了setup.py后,运行python setup.py develop,就能一键编译和安装。如果运行后没有报编译错误,就可以把实现的卷积用起来了。

单元测试

用单元测试可以快速地验证卷积是否实现成功。我写了一个简单的单元测试文件,在任意一个文件夹下创建该文件即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import torch
import torch.nn as nn
from panoflow.core.op.my_conv import MyConv2d

inc = 3
outc = 4
img_shaspe = (50, 50)

# device_name = 'cuda:0'
device_name = 'cpu'
open_bias = True


def test_one():
ts = torch.ones([1, 1, 3, 3]).to(device_name)
layer = nn.Conv2d(1, 1, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(1, 1, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


def test_two():
ts = torch.rand([1, inc, *img_shaspe]).to(device_name)
layer = nn.Conv2d(inc, outc, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(inc, outc, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


if __name__ == '__main__':
test_one()
test_two()

其中,panoflow.core.op.my_conv是我刚刚放MyConv2d的Python模块。

直接运行这个Python文件,如果没有任何输出(报错信息),就说明卷积实现成功了。

CUDA

C++实现

在刚刚的实现中,有一个my_conv_im2col_cuda的实现是空着的。在CUDA版本中,我们要实现这个函数。不过,这个函数要放在一个用nvcc编译的.cu文件里。注意!注意!注意! 因此,my_conv.cpp里那个空的my_conv_im2col_cuda实现应该全部删掉。

新建一个文件my_conv_cuda.cu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Modify from https://github.com/open-mmlab/mmcv/blob/my_conv/mmcv/ops/csrc/common/cuda/deform_conv_cuda_kernel.cuh
// Copyright (c) OpenMMLab. All rights reserved.
#include <torch/types.h>

#include "pytorch_cuda_helper.hpp"

template <typename T>
__global__ void my_conv_im2col_gpu_kernel(
const int n, const T *data_im, const int height,
const int width, const int kernel_h, const int kernel_w, const int pad_h,
const int pad_w, const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int batch_size,
const int num_channels, const int height_col,
const int width_col, T *data_col)
{
CUDA_1D_KERNEL_LOOP(index, n)
{
// index index of output matrix
const int w_col = index % width_col;
const int h_col = (index / width_col) % height_col;
const int b_col = (index / width_col / height_col) % batch_size;
const int c_im = (index / width_col / height_col) / batch_size;
const int c_col = c_im * kernel_h * kernel_w;

const int h_in = h_col * stride_h - pad_h;
const int w_in = w_col * stride_w - pad_w;
T *data_col_ptr =
data_col +
((c_col * batch_size + b_col) * height_col + h_col) * width_col + w_col;
const T *data_im_ptr =
data_im + (b_col * num_channels + c_im) * height * width;

for (int i = 0; i < kernel_h; ++i)
{
for (int j = 0; j < kernel_w; ++j)
{
T val = static_cast<T>(0);
const int h_im = h_in + i * dilation_h;
const int w_im = w_in + j * dilation_w;
if (h_im > -1 && w_im > -1 && h_im < height && w_im < width)
{
val = data_im_ptr[h_im * width + w_im];
}
*data_col_ptr = val;
data_col_ptr += batch_size * height_col * width_col;
}
}
}
}

void my_conv_im2col_cuda(Tensor data_im,
const int channels, const int height,
const int width, const int ksize_h,
const int ksize_w, const int pad_h, const int pad_w,
const int stride_h, const int stride_w,
const int dilation_h, const int dilation_w,
const int parallel_imgs, Tensor data_col)
{
int height_col =
(height + 2 * pad_h - (dilation_h * (ksize_h - 1) + 1)) / stride_h + 1;
int width_col =
(width + 2 * pad_w - (dilation_w * (ksize_w - 1) + 1)) / stride_w + 1;
int num_kernels = channels * height_col * width_col * parallel_imgs;

AT_DISPATCH_FLOATING_TYPES_AND_HALF(
data_im.scalar_type(), "my_conv_im2col_gpu", [&]
{ my_conv_im2col_gpu_kernel<scalar_t><<<GET_BLOCKS(num_kernels),
THREADS_PER_BLOCK, 0,
at::cuda::getCurrentCUDAStream()>>>(
num_kernels, data_im.data_ptr<scalar_t>(),
height, width, ksize_h, ksize_w,
pad_h, pad_w, stride_h, stride_w, dilation_h, dilation_w,
parallel_imgs, channels,
height_col, width_col, data_col.data_ptr<scalar_t>()); });

AT_CUDA_CHECK(cudaGetLastError());
}

和CPU版的类似,my_conv_im2col_cuda也是预处理了输入,并调用核函数my_conv_im2col_gpu_kernel来实现im2col

CUDA实现和CPU几乎一样,唯一的区别就是for循环变成了CUDA_1D_KERNEL_LOOP(index, n)。这个宏是头文件里帮我们定义的,它简化了CUDA的一维循环。

编译

修改setup.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from setuptools import setup
from torch.utils import cpp_extension
import os

src_root = 'panoflow/core/op'
cpp_src = ['my_conv.cpp', 'my_conv_cuda.cu']

if __name__ == '__main__':
include_dirs = ['panoflow/core/op']
cpp_path = [os.path.join(src_root, src) for src in cpp_src]

setup(
name='panoflow',
ext_modules=[
cpp_extension.CUDAExtension(
'my_ops', cpp_path, include_dirs=include_dirs)
],
cmdclass={'build_ext': cpp_extension.BuildExtension})

首先,要把源文件加入cpp_src里。之后,把CppExtension改成CUDAExtension。这样,就能编译新写的CUDA文件了。

写完了之后,再次python setup.py develop编译即可。

编译小技巧:不拿IDE直接写C++和CUDA源代码是很容易出错误的。但如果你想只用setup.py来验证代码的正确性,可以python setup.py develop > tmp.txt把编译输出重定向到一个文件里来查看。由于编译时的信息过多,在命令行里很难从一堆编译warning里找到最重要的error。

测试

由于Python部分在之前都已经写好了,可以直接用刚刚的单元测试文件测试了。只要把刚刚那份文件的device_name改成cuda:0即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch
import torch.nn as nn
from panoflow.core.op.my_conv import MyConv2d

inc = 3
outc = 4
img_shaspe = (50, 50)

device_name = 'cuda:0'
# device_name = 'cpu'
open_bias = True


def test_one():
ts = torch.ones([1, 1, 3, 3]).to(device_name)
layer = nn.Conv2d(1, 1, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(1, 1, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


def test_two():
ts = torch.rand([1, inc, *img_shaspe]).to(device_name)
layer = nn.Conv2d(inc, outc, 3, 1, 1, bias=open_bias).to(device_name)
gt = layer(ts)
my_layer = MyConv2d(inc, outc, 3, 1, 1).to(device_name)
my_layer.load_state_dict(layer.state_dict(), strict=False)
res = my_layer(ts)
res = res.to('cpu')
gt = gt.to('cpu')
assert torch.allclose(res, gt, 1e-3, 1e-5)


if __name__ == '__main__':
test_one()
test_two()

同样,没报错就说明写对了。

ResNet是CV中的经典网络。在这篇文章中,我将按照阅读论文的通用方法由粗至精地解读这篇文章。如果你对ResNet不熟,最好对着原论文阅读本文。如果你已经很熟悉ResNet了,也可以通过这篇文章查缺补漏。

粗读

摘要

摘要的前三句话开门见山地介绍本文要解决的问题以及解决的方法。

  • 问题:较深的神经网络很难训练。
  • 本文的工作:提出了一种能够轻松训练比以往的网络要深得多的残差学习框架。
  • 本文方法的进一步解释:神经网络的层将拟合一个基于输入的残差函数,而不是一个没有参考的函数。

随后,摘要展示了这种方法取得的成就:

  • 经实验而非理论证明,增加深度后,模型的训练效果和测试效果都得到了提升。
  • 精度打败了当时所有的分类模型。
  • 深度高达152,大幅超过了当时较火的19层的VGG,同时并没有增加多少计算量。
  • 精度打败了当时所有的检测、分割等任务的模型。这证明这种方法的泛化性强。

这篇摘要没有罗列贡献,而是在提出问题后轻轻一点,介绍了本文的方法及其作用。随后,所有的篇幅都在秀这种方法的成效。看完这段话,我们可能还不知道“残差”是怎么算的,但我们知道了这种方法很厉害,测试精度、训练难易度、泛化性都十分优秀,成功解决了较深的模型难以训练的问题。

在这轮粗读中,我们可以带着以下问题去概览论文:

  • 文章要解决的具体是一个怎样的问题?为什么较深的神经网络很难训练?
  • 文章提出的“残差”是什么?它为什么能解决深模型难训练的问题?
  • 凭什么说深模型难训练的问题被解决了?实验是怎么做的?
  • 这篇文章提出的模型比其他模型好在哪?

引言

引言用十分清晰的逻辑阐明了本文的核心思想——深度残差学习框架。引言先介绍“训练更深的神经网络”这一问题的由来,再抽丝剥茧地讨论该问题的解决方法,最后自然过渡到了本文的核心思想上。看完引言,我们就应该看懂这篇文章。

深度卷积神经网络的有效性,得益于它的“深度”。那么,是不是层数更深的网络就更好呢?过去的实验显示:不是的。

更深的网络面临的第一个问题是梯度弥散/爆炸,这些问题会令网络难以收敛。但是,通过参数归一初始化和归一化层,这一问题以及得到了有效缓解。证据就是,较深的网络能够收敛。

这时,较深网络又暴露出了第二个问题——退化:增加网络的深度,反而会降低网络的精度。这种退化和过拟合还不同,因为退化不仅导致精度降低,还提升了模型的训练误差。

这种退化,是不是说明较深的网络本质上就比不过较浅的网络呢?其实并不是,退化只是因为较深的网络不是那么容易优化。考虑一个较浅的网络,我们往它的后面增加几层全等映射(y=x)。这个变深的网络与原来的网络完全等价。从这个角度看,更深的网络最少不会比更浅的网络更差才对。因此,我们要想办法让学习算法能够更好地优化更深的网络,最起码优化出一个不比较浅网络更差的网络。也就是说,我们要想办法让学习算法能够轻松学会恒等映射。

文章提出了一种深度残差学习框架。假设一层网络表示的映射是$H(x)$,则该层的非线性层要拟合另一个表示残差的映射$F(x) := H(x) - x$。换个角度看,就是一层网络的输出由原来的$F(x)$变成了$F(x)+x$,多加上了一个$x$。这样,只要令$F(x)=0$,网络就能轻松描述恒等映射,较深的网络最起码不比较浅的网络更差了。

多个数据集上的实验表明,这种方法不仅容易训练,还能得到更高的精度。在多项任务的CV竞赛中,这种方法都独占鳌头。

看完了引言,我们基本能知道文章在解决怎样的问题,也大致明白了残差学习的原理。接下来,我们来过一过这篇文章的实验,看看这种方法的效果究竟如何。

实验

我们主要看几组ImageNet上的实验。为了方便称呼,作者把不使用残差连接的普通网络叫做“平坦(plain)”网络。第一组实验对比了不同深度的平坦网络的训练误差(细线)和验证误差(粗线)。

可以看出,更深的网络有更高的训练误差和验证误差。作者排除了梯度问题的影响。这些网络在训练时使用了Batch Normalization (BN),经调试,它们的梯度值也一切正常。因此,这种问题不是梯度消失导致的。引言中对于退化问题的描述得到了验证。

第二组实验是对残差网络ResNet的实验。我们可以把它的结果和平坦网络的放在一起对比。

从图中可以看出,使用残差学习后,更深的网络果然有了更低的训练误差和验证误差。同时,从表中可以看出,残差网络的测试误差也降低了一大截。这说明残差学习很好地解决了前文提到的退化问题,且这种有效性在测试数据上依然能保持。

由于这轮粗读我们没读方法部分,和方法有关的实验结果得跳过。可以直接翻页到和其他模型的对比表格处:

ResNet 在各个评价指标上打败了当时的其他网络,确实很厉害。

后面的表格显示,不仅是图像分类,ResNet在检测和分割等任务中也取得了第一名。

一般看到这里,一轮粗读就差不多完成了。从这轮粗读中,我们看懂了残差学习的核心思想,基本上理解了本文的核心创新点。当然,粗读深度学习相关的论文时,还可以扫一眼网络的核心模块和模型结构。精读的时候,我们再仔细理解文章的方法。

精读

残差公式

设多个层构成的模块在拟合一个映射$H(x)$,其中第一层的输入是$x$,则我们希望网络的非线性层去拟合另一个残差函数$F(x) := H(x) - x$,或者说整个模块在拟合$F(x) + x$。由于神经网络的多个非线性层能拟合任意复杂的映射,拟合一个残差函数$H(x)-x$也是可以实现的。

残差块

有了整体的思路,我们来具体看看每一个带残差的模块是怎么搭建的。具体而言,一个模块可以由输入$x$和参数集合${W_i}$计算得到(为了简化表达,bias被省略了):

我们主要关心$F(x, {W_i})$是怎么搭建的。文章中给出了一种简单的双层残差块示例,其中,$x$是以短路的形式连接到输出上的:

注意这里的$y = W_2\sigma(W_1x)$是一个未经激活的结果。整个模块送入下一层的输出是$\sigma(y)$,还要加上激活函数。

残差学习是一个框架,每个残差块可以有更多层。比如本文在实验部分还测试了另一种三层残差块(下图是一个示例,实际上通道数可以任意改变)。

多层的残差块都是可行的。但是,单层的残差块$y = W_1x + x$和线性层几乎一样,不能提升网络的有效性。

上述的输入$x$是直接加到激活前的输出上的。这个加法操作有一个前提:模块的输入输出通道数是一样的。在输入输出通道数不同时,可以在短路连接上加一个变换维度的线性运算$W_sx$。原来直接做加法的操作叫做全等映射,这里加上一个线性运算再做加法的操作叫做投影映射。

这里的符号标记都是基于全连接层的。这些计算对卷积层来说也一样,把矩阵乘法替换成卷积即可。

网络架构

本文先借鉴VGG的思路,先搭建了一个图像大小逐渐减半,深度逐渐翻倍的平坦网络。之后,在平坦网络连续的3x3卷积层上添加残差连接。下图中的实线代表普通的短路连接,虚线表示需要变换张量形状的短路连接。

在虚线表示的残差连接中,图像的大小和深度都发生了改变。对于深度的增加,即可以使用上一节提到的投影映射,也可以直接往多出来的通道数里填0(全等映射)。对于大小的减半,无论是用怎样的深度变化方法,都可以使用步幅为2的操作来实现大小减半。

在投影时,要进行卷积操作,卷积的步幅为2很好理解。但是,文章没有详细介绍步幅为2的全等映射是怎么做的。直观上理解,就是以步幅2跳着去输入张量里取值。

文章还提出了层数不同的ResNet架构。对于较深的网络,文章使用了3层残差块。

训练细节

训练的大部分配置都借鉴自AlexNet。如果你是刚入门图像识别且以后要从事这方面的研究,可以多关注这些细节。

数据处理

  • 随机缩放图像短边至 [256, 480] 中的均匀随机值
  • 随机翻转
  • 裁剪成 224x224
  • 像素归一化(同AlexNet)
  • 标准颜色增强(同AlexNet)

归一化

  • 激活前使用BN
  • 参数初始化使用 He Initialization

优化配置

  • batch size 256 的 mini-batch 梯度下降
  • 学习率初始化为0.1,发现误差不动了就除以10
  • 迭代$60 \times 10^4$轮
  • weight decay=0.0001, momentum=0.9
  • 没有dropout

此外,为了提高比赛中的精度,在测试时使用了10-crop(对图像裁剪10次,把输入分别输入网络,取结果的平均值)。同时,图像以不同的尺寸输入进了网络,结果取所有运算的平均值。

实验

读懂了方法,我们再来详细读一遍实验部分。

再看一次平坦网络和残差网络的对比。

在这份对比实验中,残差网络相对平坦网络没有添加任何参数。当通道数改变时,残差网络用的是0填充的全等映射。这样,平坦网络和残差网络的唯一区别就是是否有短路连接,整个对比实验非常公平。实验的结果证明了残差连接的有效性。

这篇工作还做了另一个比较平坦网络的实验:在CIFAR-10数据集上统计了平坦网络和残差网络的3x3卷积输出的标准差。标准差大,能说明输出的数值较大。

从这份对比结果中,我们能看出残差网络的输出标准差小于平坦网络。这符合残差学习的设计初衷:残差块至少是一个不会使性能变差的全等映射,其效果能够在全等映射的基础上优化。也因此,残差网络的输出大小会比平坦网络更靠近0。

看完了和平坦网络的对比,再看一看不同配置的ResNet直接的对比。文章先比较了全等映射和投影映射的短路连接。本文探讨了短路连接的3种配置:A) 全部使用全等连接 B) 有通道数变动的地方才使用投影连接 C) 全部使用投影连接。它们的表现如下:

可以看出,投影用得越多,效果越好。作者认为,B相对A更好,是因为通道数变化时0填充的部分没有残差学习(也就是没有做$+x$的操作);C相对B更好,是因为参数更多了。这些实验结果说明,投影连接并不能解决退化问题,出于性能的考虑,本文没有在其他实验中使用C。

后来,B是公认的ResNet标配。

文章还讨论了构筑更深网络的bottleneck结构。如前文所述,50层以上的ResNet在每个残差块里使用了3个1x1, 3x3, 1x3的卷积。这种设计主要是为了缩短训练时间。ResNet-50和ResNet-34有着同样的时间复杂度,却因为深度更大,精度更高。

这篇论文的主要实验就是这些。论文后面还展示了一个比较有趣的实验:在CIFAR-10数据集上训练超过1000层的ResNet。实验结果显示,1000多层的ResNet仍能成功被优化,训练误差也降到了很低,但是测试误差没有110层的网络好。作者认为,这是因为训练数据太少,网络过拟合了。

总结

ResNet是基于深度学习的计算机视觉中里程碑式的工作。通过阅读这篇论文,我们发现,ResNet使用的残差结构本身并不十分复杂。这篇工作真正出彩之处,是发现了深度神经网络的一个普遍存在的问题,并用精彩的推理设计出了一套能够解决此问题的方案。这种方案确实很有效,基于残差学习的ResNet击败了同时代的所有网络。残差连接被用在了后续几乎所有网络架构中,使用ResNet为backbone也成了其他CV任务较为常用的初始配置。

在这堂课里,我们要学习两个具体的应用:人脸识别、风格迁移。

相信大家已经在很多地方见识过人脸识别应用了:在火车站,要通过身份证和人脸核实身份;办理业务时,可以用手机完成人脸识别以核实身份;进办公楼时,员工只要刷脸通过就可以打开门禁。通过这节课的学习,我们能够学会如何用CNN完成人脸识别。

神经网络风格迁移是一项有趣的应用。它利用了CNN捕捉图像抽象信息的特点,能够把一幅图像的风格转移到另一幅图像上,从而生成一幅新的图像。这项技术不需要从头训练网络,学完这门课后,我们能快速地用代码实现神经网络风格迁移。

课堂笔记

人脸识别

准确来说,在人脸识别(Face Recognition)任务中,会给定一个有$K$个人的数据库。之后,每一次识别都会输入一张图片,输出这张图片是$K$个人中的哪一个,或者没有检测到相关人士。

有一个与这个相关的任务叫做人脸验证(Face Verification)。这个任务稍微简单一些,输入是一张图片和一个标记身份的数据(比如身份证号),要求输出图片中的人是否符合该身份。

单样本学习

按我们之前学的方法,假如我们在$K$个人的数据库上做识别(分类)任务,应该套用一个CNN模型,并在模型最后接一个$K+1$类的softmax,表示输入图片是K个人中的哪一个,或者都不是。

但是,这样的架构不适合人脸识别任务。以公司的门禁识别为例,这种方法有如下的缺点:

  1. 每来一个新同事,模型就要重新训练一次。
  2. 每个人都得上传大量的个人照片供网络训练。

理想情况下,我们希望模型能从一张人脸照片中学会分辨这张人脸,这样每个新同事只需要上传一张照片即可。这叫做单样本学习(One-shot Learning)。

为了完成单样本学习,我们可以从另一个角度来建模人脸识别问题:如果输入的人脸和数据库里某张人脸极为相似,我们就说识别出了这张人脸;否则,就说没有识别到有效的人脸。

这样,人脸识别问题就被转换为了一个求两张图片相似度的问题。我们可以让网络学习一个输入是两张图片,输出是二者相似度的一个映射。

孪生网络

在完成和相似度有关的问题时,一种常见的做法是使用孪生网络(Siamese Network)。

假设网络的倒数第二层有128个神经元。在普通分类网络中,这128个神经元输出的长度为128的向量会被输入进最后的softmax层。而在孪生网络中,我们要去掉softmax层,并用这个没有sofrmax的网络$f$分别输出两张图片$x^{(1)}, x^{(2)}$对应的128维向量$f(x^{(1)}), f(x^{(2)})$。这样,每张图片有了唯一对应的一个128维向量,这个向量可以看成是该图片的编码(encoding)。而我们又知道,对向量求相似度是很方便的。我们可以利用两张图片的编码求出相似度。

说起向量的相似度,最容易想到是向量间的距离$||v - u||^2$。因此,我们可以i设法让网络学会这样一种关系:

  • 若$x^{(i)}, x^{(j)}$是同一人,则$||f(x^{(i)}) - f(x^{(j)})||^2$很小。
  • 若$x^{(i)}, x^{(j)}$不是同一人,则$||f(x^{(i)}) - f(x^{(j)})||^2$很大。、

为了达成这个目标,我们来看看应该用怎样的误差来指导网络的优化方向。

三元组误差

在让网络学习基于距离的相似度时,一种常用的误差是三元组误差(Triplet Loss)。

在一轮训练中,我们要用到3张图片:一张基准图(anchor)$A$和与其相对的正负样本$P, N$各一张。设$d(A, B)$为图片$A, B$的编码的距离,则我们希望$d(A, P) \leq d(A, N)$。

左边的人是吴恩达的妻子,这几张图片已经出现了好几次。

移个项,即我们希望:

但这个条件太容易满足了,令$f(x)=0$恒成立的话无论怎样的输入都可以使上式左侧为0了。因此,我们要加一点小小的约束:

这个$\alpha$是一个较小的数,比如可以令$\alpha=0.2$。为了达成这个目标,我们可以设置以下的误差函数

这里取max的直观解释是:只要满足开始那个不等式,让正样本和负样本能够分清楚就行了。至于两类样本要不要分辨得那么分明,我们并不关心。

为了利用这个误差训练网络,我们要在训练集里为同一个人准备多张照片。比如1000个人,每人100张照片,共100000张照片构成训练集。

在提出此设计的FaceNet中,还有一些小细节:为了加大训练难度,让模型效果更好,每次训练时应该选择较难的三元组$A, P, N$。详情请阅读原论文。

人脸验证与二分类问题

其实,判断两张图片的编码是否相似的问题可以简单地转化成一个二分类问题:把两张图片的编码“拼起来”,输入进一个逻辑回归的单元,让网络输入两张图片是否相似。

这里把两张图片“拼起来”有很多的实现方式。一种简单的方式是求两个编码的绝对值(L1距离)。

另外,在部署时,由于比较的图像的编码是可以预处理的,只需要用神经网络跑一遍输入图片即可。

神经网络风格迁移

我之前写了一篇详细介绍神经网络风格迁移的文章,我认为那篇文章比这堂课更好理解,非常推荐大家阅读。因此,这部分笔记我会写得潦草一些。

什么是神经网络风格迁移?

如上图所示,在神经网络风格迁移中,输入一张表示内容的图(C)和一张表示画家风格的图(S),我们可以借助神经网络生成一幅融合内容与风格的图片(G)。

接下来实现神经网络风格迁移时,我们会关注CNN浅层和深层提取出来的特征。因此,在具体介绍风格迁移的实现之前,我们先来看看CNN各层网络究竟学到了什么。

深度卷积网络学到了什么?

为了设法可视化神经网络每一层的输出,我们可以考虑把整个训练集喂入网络,找出使某一层某一神经元激活输出最大的几个像素块。

以AlexNet为例,令第一层某几个神经元激活输出最大的像素块是:

可以看出,浅层的神经网络更关注不同方向轮廓信息。

用同样的方式可视化更深的层,可以看出网络关注的信息越来越抽象。

损失函数

和优化一个网络的参数不同,在神经网络风格迁移中,我们要把一幅图像当成可以优化的参数,通过修改图像的像素值来最小化一个损失函数。让我们看看这个损失函数是怎么定义的。

损失函数由两部分构成:生成图像(G)和内容图像(C)的内容损失与和风格图像(S)的风格损失。二者之间的比例靠比例系数$\alpha, \beta$控制。

我们来看一个优化这个损失函数的步骤示例:

  1. 生成一个随机图像$G: 100 \times 100 \times 3$
  2. 使用梯度下降更新$G$,最小化$J(G)$

内容损失

内容损失的计算方式如下:

  1. 使用一个预训练的网络(最常用的是VGG)。
  2. 选择神经网络中的某一层$l$,这一层相关的数据会用来计算内容损失。$l$越浅,表示越具体的图像;$l$越深,表示越抽象的图像。一般选取适中的$l$。
  3. 令$a^{l}, a^{[l][G]}$为图像$C, G$网络第$l$层的激活输出。我们认为,如果这两个值很相似,则两幅图像的内容就很相似。
  4. 令$J_{content}(C, G) = ||a^{l} - a^{[l][G]}||^2$

风格损失

看完了内容损失,现在来看风格损失的计算方式。我们刚刚一直在用名词“风格”。这个词放在美术里,可以指画家的绘画风格。而对于神经网络来说,“风格”又是什么意思呢?

和刚刚的内容损失类似,计算风格损失时,我们也要选择CNN的某层$l$,计算这一层卷积激活输出的通道相关量

这里“通道相关量”反映的是图像各个通道间两两的相关关系。这句话可能有点拗口,让我们看一个详细的通道相关量示例。

假设一个激活输出有5个通道,我们用颜色“红-黄-紫-绿-蓝”称呼它们。每个通道表示的是一类特征,比如红色的通道表示图像有没有竖着的条纹,黄色通道表示图像有没有橙色的色块。我们想计算红黄通道之间的相关量,其实就是计算图像每个像素处红色通道的值和绿色通道的值之间的相关关系,看看它们是会同时出现,还是一个出现了另一个就不出现。(两个数值的相关量就是它们的乘积,具体的数学表示会在后文中给出)

为什么这样的相关关系能够捕捉到风格信息呢?红色通道和黄色通道的相关关系,就是是不是有竖条纹的地方就有橙色色块。这样一种线条和颜色的关联,就可以代表图片的风格。(下图中红色的框和黄色的框分别圈出了两种特征)

从直觉上认识了风格,现在我们来看一看风格的具体计算方法。在计算两个向量的相关量时,可以直接求两个向量的积。因此,我们要用到一种叫做”风格矩阵“的中间量,它计算了所有通道向量的乘积:

其中,$a^{[l]}_{i, j, k}$是第$i$行$j$列第$k$个通道的激活输出,风格矩阵$G$的形状是$n_c^{[l]} \times n_c^{[l]}$,$G^{[l]}_{kk’}$描述的是第$l$层激活输出中,第$k, k’$个通道间的相关量。

在数学中这个矩阵叫做gram矩阵。

有了风格矩阵,就可以算风格损失了。为了简化表示,我们用$G^{[l]}(S), G^{[l]}(G)$分别表示风格图像S和生成图像G的风格矩阵。和刚刚一样,我们选择一层$l$,并计算风格误差:

这是一层上的风格误差。其实,我们还可以指定多个层上的风格误差,以使生成图像能够拟合风格图像在多个卷积层上(抽象程度不同)的风格。多个层的风格误差就是各层风格误差之和。

1D和3D上的卷积

在结束这门课之前,我们来学习最后一项内容——1D和3D数据上的卷积。之前,我们只关注2D的图像数据,当维度不是2时,卷积有怎样的变化呢?

1D数据可以是心电图。和可以用2D卷积捕捉2D图像的边缘一样,我们可以用下图中那个尖尖的1D卷积来捕获高频的数据。1D卷积前后的形状变化规律和2D一样,比如用16个长度为5的卷积卷一个$14 \times 1$的1D图像,可以得到$10 \times 16$的1D图像(第一维是图像长度,第二维是通道数)。

3D数据也是类似的道理。用16个$5 \times 5 \times 5$的卷积核卷一个单通道$14 \times 14 \times 14$的单通道图像,可以得到一个$5 \times 5 \times 5 \times 16$的图像。

总结

这节课主要介绍了人脸识别和神经网络风格迁移两项任务,最后顺便介绍了1D和3D上的卷积。

  • 人脸识别
    • 人脸验证与人脸识别任务的定义
    • 如何对单样本学习建模
    • 孪生网络
    • 基于三元组误差和二分类误差的人脸识别网络
  • 神经网络风格迁移
    • 神经网络风格迁移的输入输出
    • 利用小技巧可视化 CNN 学到的图像信息
    • 生成风格迁移图像时的内容误差与风格误差

人脸识别任务依赖于数据集,项目搭建起来比较麻烦。对于这一周的内容,我就不展示其他的代码实战笔记了,欢迎阅读我之前写的神经网络风格迁移的实现

在这篇文章中,我将给出一份带运行示例的NMS Python脚本,并对算法和代码进行详细解说。相信大家看完这篇文章后,能够轻松地掌握NMS的底层原理。

如果你对目标检测的基础知识不太熟,欢迎先阅读我的上篇文章:目标检测简介

示例脚本(包括可视化的代码)链接:https://github.com/SingleZombie/DL-Demos/tree/master/dldemos/nms

算法介绍

在目标检测算法中,为了尽量不漏掉物体,会输出大量的检测结果(每一条结果由检测概率与检测框组成)。这些检测结果很可能有重复的,即会有多个框标出了同一个物体。下图就是一个例子,算法正确识别出了两辆车,但却有多个检测框标出了同一辆车。

我们需要一个算法来过滤多余的检测框。最常用的算法就是NMS(Non-Maximum Suppresion, 非极大值抑制)。该算法的思路很简单:只保留局部概率最大的检测框,与其重合的其他检测框都会被舍去。

算法的伪代码如下:

text
1
2
3
4
5
6
7
输入所有检测概率、检测框
当还有检测框没有处理:
从剩下的框里挑出检测概率最大的框 bbox_a
遍历所有没有处理的框bbox_i:
如果 bbox_i != bbox_a 且 bbox_i 与 bbox_a 重合:
舍去 bbox_i
把 bbox_a 输出成一个检测结果

当然,这个算法的描述还不够准确:究竟该怎么定义两个检测框是“重合”呢?如果两个检测框有交集就说它们重合是行不通的,因为图片中很可能会有挨得很近的物体,它们的检测框就是相交的。因此,我们一般用IoU(交并比)来描述两个框的重合程度,如果IoU超出某个阈值,就说这两个框是“重合”的。

IoU的计算很简单,算出两个矩形的「交面积」,再用两个矩形面积之和减去「交面积」就可以得到「并面积」,「交面积」比「并面积」就是IoU。

在NMS之前,一般还会先做一步预处理:对于预测概率小于某个阈值的检测框,我们不信任它们,会在进行NMS之前就把它们舍去。在代码实现时这部分逻辑常常会放到NMS的函数里。这样,整个NMS的流程是这样的:

上述的NMS针对是识别一种物体的任务,在推广到多分类NMS时,只要把输入的检测概率换成有物体的概率乘上概率最大的类别的概率即可。

代码实现

理论上这段代码是兼容任何格式的张量的。经测试,NumPy, PyTorch 的张量都可以正确运行。

先看一下IoU的实现代码:

1
2
3
4
5
def iou(b1: Tuple[int, int, int, int], b2: Tuple[int, int, int, int]) -> float:
intersection = box_intersection(b1, b2)
inter_area = area(intersection)
union_area = area(b1) + area(b2) - inter_area
return inter_area / union_area

所有的检测框用一个长度为4的元组表示。box_intersection用于计算两个框的相交框,area用于计算框的面积。这段代码和之前描述得一样,先获取相交区域,计算交面积,再计算并面积,最后算除法。

box_intersection的实现如下:

1
2
3
4
5
6
7
8
9
10
11
def box_intersection(
b1: Tuple[int, int, int, int],
b2: Tuple[int, int, int, int]) -> Tuple[int, int, int, int]:
x11, y11, x12, y12 = b1
x21, y21, x22, y22 = b2

xl = max(x11, x21)
xr = min(x12, x22)
yt = max(y11, y21)
yb = min(y12, y22)
return (xl, yt, xr, yb)

算相交区域,可以理解成分别求出x, y方向上的相交部分,再把两部分合成一个框。而求直线的相交,就是取最大的左端点和最小的右端点。

area的实现如下:

1
2
3
4
5
def area(box: Tuple[int, int, int, int]) -> float:
x1, y1, x2, y2 = box
width = max(x2 - x1, 0)
height = max(y2 - y1, 0)
return width * height

如果两个框不相交,x2 - x1y2 - y1会出现小于0的情况。因此,要保证它们最小为0,再算面积就不会有错了。

有了iou,就可以实现了NMS了。我的NMS函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def nms(predicts: np.ndarray,
score_thresh: float = 0.6,
iou_thresh: float = 0.3):
"""Non-Maximum Suppression

Args:
predicts (np.ndarray): Tensor of shape [n, 5]. The second demesion
includes 1 probability and 4 numbers x, y, w, h denoting a bounding
box.
score_thresh (float): The boxes with probability lower than
score_threash will be discarded.
iou_thresh (float): The threshold determining whether two boxes are
"overlapped".

Returns:
(np.ndarray, List[int]): The filtered predictions and the indices of
remaining boxes.
"""

NMS算法需要输入检测结果predicts,输出过滤后的检测结果filtered_predicts。此外,NMS算法有两个输入属性score_threshiou_thresh,分别表示被选定的检测框最小需要的概率、判断两个框是否重合的IoU阈值。为了方便其他的计算(比如多分类NMS),我还输出了一个索引数组indices,表示被选中的框的索引。这方面的逻辑可以根据自己的项目要求进行优化,没有统一的规定。

NMS的实现和开始的伪代码几乎一模一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def nms(predicts: np.ndarray,
score_thresh: float = 0.6,
iou_thresh: float = 0.3):
n_remainder = len(predicts)
vis = [False] * n_remainder

# Filter predicts with low probability
for i, predict in enumerate(predicts):
if predict[0] < score_thresh:
vis[i] = True
n_remainder -= 1

# NMS
output_predicts = []
output_indices = []
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

# Append output
max_p = predicts[max_index]
output_predicts.append(max_p)
output_indices.append(max_index)

# Suppress
for i, p in enumerate(predicts):
if not vis[i] and i != max_index:
if iou(p[1:5], max_p[1:5]) > iou_thresh:
vis[i] = True
n_remainder -= 1
vis[max_index] = True
n_remainder -= 1

return output_predicts, output_indices

一开始,先声明一些辅助的变量。n_remainder表示还有多少个框没被访问,vis[i]表示第i个框是否被访问。

1
2
n_remainder = len(predicts)
vis = [False] * n_remainder

一开始,先过滤那些概率过小的框:

1
2
3
4
for i, predict in enumerate(predicts):
if predict[0] < score_thresh:
vis[i] = True
n_remainder -= 1

之后,正式进入NMS,先准备好输出的列表:

1
2
3
4
# NMS
output_predicts = []
output_indices = []
while n_remainder > 0:

在还有没访问的框时,先找出剩下的框中概率最大的那个框:

1
2
3
4
5
6
7
8
9
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

之后,抑制掉和概率最大框“重合”的框。

1
2
3
4
5
6
7
8
9
10
11
while n_remainder > 0:
# Find argmax
...

max_p = predicts[max_index]
# Suppress
for i, p in enumerate(predicts):
if not vis[i] and i != max_index:
if iou(p[1:5], max_p[1:5]) > iou_thresh:
vis[i] = True
n_remainder -= 1

最后,把这个结果添加进输出,并维护好visn_remainder

1
2
3
4
5
6
7
8
9
10
11
12
while n_remainder > 0:
# Find argmax
...

# Suppress
...

# Append output
output_predicts.append(max_p)
output_indices.append(max_index)
vis[max_index] = True
n_remainder -= 1

这里可以把vis[max_index] = True那两行移到抑制操作的前面,这样判断里就不用加i != max_index了。我这样写是为了强调一下,判断重合的时候不需要判断自己。

实现完了,返回结果。

1
return output_predicts, output_indices

单元测试

做单元测试的时候,最好是找一份现有的实现用做对照。为了让整个测试过程更贴合实际一些,我用MMDetection的YOLOv3跑了一个NMS前和NMS后的检测框结果,并把NMS前的检测框输入进了自己实现的NMS,比较了两份NMS的输出。以下是具体的测试过程。

照着MMDetection的官方文档,下载好YOLOv3模型,用下面的代码即可对MMDetection的demo图片进行推理并保存结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from mmdet.apis import inference_detector, init_detector, show_result_pyplot
from mmdet.models.detectors import BaseDetector

# Choose to use a config and initialize the detector
config = 'configs/yolo/yolov3_mobilenetv2_320_300e_coco.py'
# Setup a checkpoint file to load
checkpoint = 'checkpoints/yolov3_mobilenetv2_320_300e_coco_20210719_215349-d18dff72.pth'
# initialize the detector
model: BaseDetector = init_detector(config, checkpoint, device='cuda:0')

img = 'demo/demo.jpg'
result = inference_detector(model, img)

model.show_result(img, result, score_thr=0.3, out_file='work_dirs/1.jpg')

为了得到NMS前的输入,我在mmdet/models/dense_head/YOLOV3Head.get_bboxes里面插入了一段输出结果的代码。

1
2
3
4
5
6
7
8
9
10
11
...

save_dict = {
'bboxes': bboxes,
'cls_scores': scores,
'prob': objectness
}
torch.save(save_dict, 'work_dirs/bboxes.pt')

det_bboxes, det_labels = multiclass_nms(
...

得到NMS前的输入是

把这份数据输入进我们的NMS中,得到的可视化结果如下:

这跟开始那份输出一模一样。看来我们实现的这份NMS完全正确。

如果你对测试过程、可视化、多分类NMS感兴趣,欢迎直接阅读我的项目源码。

总结

在这篇文章中,我给出了一份NMS的简洁而靠近底层的Python实现,并对NMS算法进行了介绍。通过阅读这篇文章,相信大家已经完全理解了NMS的原理,并且能够用任何一种语言实现NMS。一般的NMS开源实现支持的参数更多,代码会更复杂一些,但它们的核心和我的这份实现是一样的。

这份NMS的实现还有很大的改进空间。比如每轮求概率最大的框时,可以先排好序,或者用优先队列,这样均摊下来每次获取概率最大的框的复杂度是O(logn)。但是后面判断重复的框一定有一个O(n)的计算,这部分的优化并不显著。大家有余力可以参考成熟的CV项目里NMS是怎么高效用C++实现的。

这节课中,我们要学习计算机视觉中最重要的任务之一——目标检测任务。我们会先认识目标定位和关键点检测这两个比较简单的任务,慢慢过度到目标检测任务。之后,我们会详细学习目标检测的经典算法YOLO。最后,我们会稍微认识一下语义分割任务及适用于此问题的U-Net架构。

课堂笔记

目标定位

图像分类问题中,给定一幅图片,我们只要说出图片里的物体是什么就行了。在这堂课要讨论的任务中,我们还要多做一件事——定位。我们要先用边框圈出图中的物体,再说出框里的物体是什么。这叫做带定位(localization)的分类问题。更进一步,如果我们不再是只讨论一个物体,而是要把图片中所有物体都框出来,并标出每一个物体的类别,这就是目标检测问题,

我们对分类任务的神经网络结构已经很熟悉了。那么,带定位的分类该使用怎样的网络呢?实际上,一个边框可以用边框中心和边框宽高这四个量表示。除了softmax出来的分类结果外,我们只要让网络再多输出四个数就行了。如下图所示:

这里,要统一一下对于边框的定义。我们用$b_x, b_y$表示边框的中心坐标,$b_h, b_w$表示边框的高、宽。

来看一下标签$y$的具体写法。假设一共有四类物体:行人、汽车、摩托车、背景(没有物体)。那么,标签$y$应该用$y=[p_c, b_x, b_y, b_h, b_w, c_1, c_2, c_3]^T$表示。其中,$p_c$表示图中有没有物体。若$p_c=1$,则$c_1, c_2, c_3$分别表示物体属于除背景外的哪一类;若$p_c=0$,则其他值无意义。

这样,在算误差时,也需要分类讨论。若$p_c=1$,则算估计值与标签8个分量两两之间的均方误差;若$p_c=0$,只算$p_c$的均方误差,不用管其他量。

只要更换一下神经网络的输出格式,我们就能得到一个完成目标定位问题的网络。

关键点检测

我们刚刚学了用2个点表示一个边框。其实,拓展一下边框检测,就是一个关键点(英文有时叫做”landmark”,是“地标”的意思)检测问题。

比如,在人脸关键点检测中,我们可以定义一堆关键点,分别表示眼睛、鼻子、嘴巴……的位置。我们还是让网络先输出一个数,表示图中有没有人脸;再输出2n个数,表示n个人脸关键点。这样,网络就能学习怎么标出人脸关键点了。

很多应用都基于人脸关键点检测技术。比如我们检测到了眼睛周围的关键点后,就可以给人“戴上”墨镜。

总之,通过这一节的学习,我们要知道,目标定位中输出2个坐标只是关键点检测的一个特例。只要训练数据按照某种规律标出了关键点,不管这些关键点是表示一个框,还是人脸上各器官的位置,网络都能学习这种规律。

目标检测

有了之前的知识储备,现在我们来正式学习目标检测。目标检测可以用一种叫做“滑动窗口”的技术实现。

假设我们要构建一个汽车的目标检测系统。我们可以先构造一个汽车分类数据集——数据集的x是一些等大的图片,y表示图片里是不是有汽车。如果图片里有汽车,汽车应该占据图片的大部分位置。

通过学习,网络就能够判断一个框里的物体是不是汽车了。这样,我们可以用一个边框框出图片的一部分,裁剪下来,让网络看看图片的这一部分是不是汽车。只要我们尝试的次数够多,总能找出图中的汽车。

在遍历边框时,我们是通过“滑动”的方法:遍历边框的大小,选择好大小后把框放到左上角,先往右移,再往下移。所以这种方法叫做“滑动窗口”。

滑动窗口算法有一个缺点:如果我们移动窗口的步伐过小,则运行分类器的次数会很多;如果移动窗口的步伐过大,则算法的精度会受到影响。在深度学习时代之前,分类器都是一些简单的线性函数,能够快速算完,多遍历一些滑动窗口没有问题。而使用了深度CNN后,遍历滑动窗口的代价就很大了。

幸好,滑动窗口也可以通过卷积来生成,而不一定要遍历出来。让我们看下去。

基于卷积的滑动窗口

滑动窗口其实可以通过执行巧妙的卷积来生成。在那之前,我们先学一门前置技能:怎么把全连接层变成卷积层。

前两周学习CNN时,我们学过,卷积结束后,卷积的输出会被喂入全连接层中。实际上,我们可以用卷积来等价实现全连接层。比如下图中,一个$5 \times 5 \times 16$的体积块想变成一个长度为400的向量,可以通过执行400个$5 \times 5$的卷积来实现。

知道了这一点后,我们就可以利用卷积来快速实现滑动窗口了。

假设我们按照上一节的算法,先实现了对$14 \times 14$的小图片进行分类的分类器。之后,我们输入了一张$16 \times 16$的大图片。我们遍历滑动窗口,令步幅为2。这样,理论上,有4个合法的滑动窗口,应该执行4次分类器的运算,如下图所示:

可是,仔细一想,在执行4次分类器的过程中,有很多重复的运算。比如,对于4个滑动窗口中间那共有的$12 \times 12$个像素,它们的卷积结果被算了4次。理想情况下,只需要对它们做一次卷积就行了。这该怎么优化呢?

其实,很简单,我们可以利用卷积本身的特性来优化。卷积层只定义了卷积核,而没有规定输入图像的大小。我们可以拿出之前在$14 \times 14$的图像上训练好的卷积层,把它们用在$16 \times 16$的图片的卷积上。经过相同的网络,$16 \times 16$的图片会生成一个$2 \times 2$大小的分类结果,而不是$1 \times 1$的。这$2 \times 2$个分类结果,恰好就是那4个滑动窗口的分类结果。通过这样巧妙地利用卷积操作,我们规避了遍历滑动窗口带来的重复计算。

不过,这个方法还是有一些缺陷的。在刚才那个例子中,$16 \times 16$的图片其实可以放下9个$14 \times 14$大小的边框。但是,由于分类网络中max pool的存在,我们只能生成4个分类结果,也就是步幅为2的滑动窗口的分类结果。同时,最准确的检测框也不一定是正方形的,而可能是长方形的。为了让生成的滑动窗口更准确一些,我们要用到其他方法。

预测边框

在这一节,我们要使用YOLO(You Only Look Once)算法解决上一节中碰到的问题。还记得这周课开头学的目标定位问题吗?我们可以把滑动窗口和目标定位结合一下。

给定一幅图像,我们可以把图像分成$3 \times 3$个格子。训练模型前,我们要对训练数据做预处理。根据每个训练样本中物体的中心点所在的格子,我们把物体分配到每一个格子中。也就是说,不管一个物体的边框跨越了几个格子,它的中心点在哪,它就属于哪个格子。比如对于下图的训练样本,右边那辆车就属于橙色的格子。之后,我们给每个格子标上标签$y$。这个标签$y$就是目标定位中那个表示图片中是否有物体、物体的边框、物体的类别的标签向量。对于这个$3 \times 3$的格子,有9个标签向量,整个标签张量的形状是$3 \times 3 \times 8$。

这样,每一幅图像的输出和标签一样,也是一个$3 \times 3 \times 8$的张量了。输入一幅图片后,我们利用上一节学的卷积滑动窗口,同时预测出每个格子里的物体边框。

另外,这里要详细讨论一下$b_x, b_y, b_h, b_w$的表示方法。由于我们只关心框相对于格子的位置,因此我们可以把规定一个格子的边长为1。这样,就满足$0 \leq b_x, b_y \leq 1$了。不过,由于物体的边框可以超出小框,$b_h, b_w > 1$是很有可能的。

看到这,大家可能会有一些疑问:如果一个格子里有多个物体呢?的确,这个算法无法输出一个格子里的多个物体。一种解决方法是,我们可以把格子分得更细一点,比如$19 \times 19$个格子。这样,可以被检测到物体会多一些。但是,增加格子数又会引入一个新的问题——多个格子检测到了同一个物体。下面的两节里我们会尝试解决这个新的问题。

吴恩达老师说,YOLO这篇论文很难读懂,他和其他几个资深研究者都花了很大的功夫才读懂这篇论文。

IoU(交并比)

在目标检测中,有一个微妙的问题:框出一个物体的边框有无数个,想精确框出标签的边框是不可能的。怎么判定一个输出结果和标签里的边框“差不多”呢?这就要用到IoU(Intersection over Union,交并比) 这个概念。

IoU,顾名思义,二者的交集比上二者的并集,很好理解。比如下图中,网络的输出是紫框,真值是红框。二者的并集是绿色区域,交集是橙色区域。则IoU就是橙色比绿色。

依照惯例,如果IoU$\geq 0.5$,我们就认为网络的输出是正确的。当然,想更严格一点,0.6,0.7也是可以的。

IOU 也是 “I owe you(我欠了你的钱)”的缩写,哈哈哈。

NMS(非极大值抑制)

假设在YOLO中,我们用$19 \times 19$个小格来检测物体。可是,由于小格子太多了,算法得到了多个重复的检测框(以及每个框中有物体的概率)。这该怎么办呢?

NMS(Non-Maximum Suppresion,非极大值抑制)就是解决这个问题的算法。这个算法的名字听起来很奇怪,但大家理解了这个算法的实现后,就知道这个“抑制”是什么意思了。

讲起算法我就不困了。我会抛弃视频中的讲解思路,用我自己的逻辑讲一遍。讲算法,千万不能一上来就讲算法的步骤,一定要先讲清楚算法的思路。

在学NMS之前,我们先动动脑,看看在去掉重复的框时,我们期望得到怎样的去重输出结果。

首先,既然是去重,那么就不能出现两个框过度重合的情况。其次,我们希望留下来的框的预测概率尽可能大。

在这两个要求下,我们来看看上面那幅图的输出应该是怎样的。
我们一眼就能看出,对于左边那辆车,我们应该保留0.8的框;对于右边那辆车,我们应该保留0.9的框。

为什么我们能“一眼看出”呢?这是因为左边两个框、右边三个框恰好都分别表示了一辆车。我们能够快速地把这些框分成两类。但是,在情况比较复杂时,我们就难以快速找出最好的框了。比如下面这种情况中,两辆车很近,有些框甚至同时标出了两辆车:

为了处理这种复杂的情况,我们必须想出一种万全的算法,以筛选出那些概率比较大的框。

稍微思考一下,其实这样的算法非常简单:找出最大的框,去掉所有和它过度重合的框;在剩下的框中,找出最大的框,去掉所有和它过度重合的框;……。一直重复直到没有未处理的框为止。这就是NMS算法。

还是让我们来看看刚刚那个例子。使用NMS时,我们会先找到0.9这个框,“抑制”掉右边0.6和0.7的框。在剩下的框中,最大的是0.8这个框,它会“抑制”掉左边那个0.7的框。

接下来,让我们来严格描述一下这个算法。假设我们有$19 \times 19=361$个输出结果,每个输出结果是一个长度为5的向量$[p_c, b_x, b_y, b_h, b_w]$,分别表示有物体的概率、边框的中心和高宽(我们先不管检测多个物体的情况。事实上,当推广到多个物体时,只要往这个输出结果里多加几个概率就行了)。我们要用NMS输出应该保留的检测结果。“过度重合”,就是两个框的IoU大于0.5。

首先,先做一步初筛,扔掉概率$p_c$小于0.6的结果。

之后,对于没有遍历的框,重复执行:找出概率最大的框,把它加入输出结果;去掉所有和它IoU大于0.5的框。

这个过程用伪代码表示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Input and preprocessing
input predicts of size [19, 19, 5]
resize predicts to [361, 5]

# Filter predicts with low probability
filtered_predicts = []
for predict in predicts:
# drop p_c < 0.6
if predict[0] >= 0.6:
filtered_predicts.append(predict)

# NMS
n_remainder = len(filtered_predicts)
vis = [False] * n_remainder # False for unvisited item
output_predicts = []
while n_remainder > 0:
max_pro = -1
max_index = 0
# Find argmax
for i, p in enumerate(filtered_predicts):
if not vis[i]:
if max_pro < p[0]:
max_index = i
max_pro = p[0]

# Append output
max_p = filtered_predicts[max_index]
output_predicts.append(max_p)

# Suppress
for i, p in enumerate(filtered_predicts):
if not vis[i] and i != max_index:
if get_IoU(p[1:5], max_p[1:5]) > 0.5:
vis[i] = True
n_remainder -= 1
vis[max_index] = True
n_remainder -= 1

假设进NMS的框有$N$个。算法里求当前最大框那一步可以用优先队列来优化,这一步复杂度是$O(logN)$。但是“抑制”那一步必须要遍历一遍剩下的框,还是有一个$O(N)$复杂度(我暂时想不出朴素的低复杂度的算法)。因此,不用优先队列优化也差不多。算上外层的循环,整个算法的复杂度是$O(N^2)$。在实际的应用中,送入NMS的结果没那么多,不会超过10000个。而且,随着框被不断过滤,外层循环的次数会减少不少。这个算法的性能瓶颈不在输入数$N$上,而在于求IoU实现的细节上。

锚框(Anchor Boxes)

为了让一个格子能够检测到多个物体,YOLO论文还提出了一种叫做锚框(Anchor Boxes)的技术。

假设一个格子里同时包含了两个物体:一个“竖着”的人和一个“横着”的车。那么,我们可以以这个格子的中心点为“锚”,画一个竖的框和横的框,让每个格子可以检测到两个物体。这样,人和车都能被检测了。

严谨地描述,锚框技术是这样做改进的:

  • 之前,每一个格子只能包含一个样本。训练数据中每一个标签框会被分配到它中点所在格子
  • 现在,每一个格子能包含多个样本。每个格子都会预定义几个不同形状的锚框,有几个锚框,就最多能检测到几个物体。训练数据的每一个标签框会被分配到和它交并比最大的锚框

注意,之前的最小单元是格子,现在是锚框,所以说现在每个样本被分配到锚框上而不是格子上。可以看下面这两个样本的例子,第一个例子是两个物体都检测到了,第二个是只有锚框2里有物体。和之前一样,如果有某个锚框里没有物体,则除了$p_c$外全填问号即可。

锚框技术实际上只是对训练数据做了一个约束,改变了训练数据的格式。检测算法本身没有什么改变。

YOLO 算法总结

让我们把前几节的内容总结一下,看一下YOLO算法的全貌。

在训练前,我们要对数据做预处理。首先,我们要指定以下超参数:图片切分成多大的格子、每个格子里有多少个锚框。之后,根据这些信息,我们可以得到每一个训练标签张量的形状。比如$3 \times 3 \times 2 \times 8$的一个训练标签,表示图片被切成了$3 \times 3$的格子,每个格子有两个锚框。这是一个三分类问题,对于每一个检测出来的物体,都可以用一个长度为$8$的向量表示。其中,$p_c$表示这个锚框里有没有物体, $(b_x, b_y), (b_h, b_w)$分别表示中心点坐标、框的高宽,$c_1, c_2, c_3$分别表示是否为该类物体。

有了预处理好的训练数据,就可以训练一个CNN了。

在网络给出了输出后,由于输出的框往往多于标签中的框,还要对输出结果进行筛选。筛选的过程如前文所述,先去掉概率过小的框,再分别对每一类物体的框做NMS。

课堂上没有介绍loss。loss的组成比较复杂,建议阅读原论文。

区域提案

YOLO算法是在一堆固定的框里找物体。实际上,我们还可以用神经网络来找出候选框,再在这些框里详细检测。这种技术就叫做区域提案(region proposal),相关的网络叫做R-CNN(Region with CNN)。

R-CNN 系列网络有多个改进版本:

  • R-CNN: 使用区域提案,但是每次只对一个区域里的物体做分类。
  • Fast R-CNN: 使用区域提案,并使用基于卷积的滑动窗口加速各区域里物体的分类。
  • Faster R-CNN: 前两个算法都是用传统方法提案区域,Faster R-CNN用CNN来提案区域,进一步令算法加速。

吴恩达老师认为,虽然区域提案的方法很酷,但把目标检测分两步来完成还是太麻烦了,一步到位的YOLO系列算法已经挺方便了。

基于U-Net的语义分割

最早这门课是没有这一节的,估计U-Net的架构太常用了,吴恩达老师把基于U-Net的语义分割加入了这周的课中。

语义分割也是应用非常广泛的一项CV任务。相较于只把物体框出来的目标检测,语义分割会把每一类物体的每个像素都精确地标出来。如下图的示例所示,输入一张图片,语义分割会把每一类物体准确地用同一种颜色表示。

具体来说,语义分割的输出是一个单通道图片。图片的数字表示此处像素的类别。

在分类模型中,图像会越卷越小,最后压平放进全连接层并输出多个类别的分类概率。而在语义分割模型中,由于模型的输出也是一幅图像,在输入图像被卷小了以后,应该还有一个放大的过程。

目前,我们还没有学过带学习参数的可以放大图像分辨率的结构。下一节介绍的反卷积能够完成这件事。

反卷积

反卷积和卷积的输入输出大小彻底相反。让我们看看反卷积的形状是怎么计算的。

如上图所示,反卷积也有卷积核大小、步幅、填充这些参数。不过这些参数都是在输出图像上做的。也就是说,我们会在输出图像上做填充,并且每次在输出图像上一步一步移动。我们把正卷积的输出大小计算公式套到反卷积上的输出上,就能算出反卷积的输入的大小。

在卷积时,我们是把卷积核与图像对应位置的数字乘起来,再求和,算出一个输出值;反卷积则是反了过来,把一个输入值乘到卷积核的每个位置上,再把乘法结果放再输出的对应位置上。一趟反卷积计算如下图所示:

这里我们只需要知道反卷积可以做上采样就行了,不需要纠结底层实现细节。

本课对反卷积的介绍甚少。实际上,反卷积可以通过正卷积来实现。我扫了一圈没看到讲解得比较好的相关文章,有兴趣的可以自行查找资料。

U-Net 架构

学完了反卷积,可以来看U-Net的结构了。

U-Net除了对图像使用了先缩小再放大的卷积外,还使用了一种跳连(不是ResNet中残差连接的跳连,而是把两份输入拼接在了一起)。这样,在反卷积层中,不仅有来自上一层的输入,还有来自前面相同大小的正卷积的结果。这样做的好处是,后半部分的网络既能获得前一个卷积的抽象、高级(比如类别)的输入,又能获得前半部分网络中具体,低级的特征(比如形状)。这样,后面的层能够更好地生成输出。

U-Net具体的结构如下:

这幅图中,做运算的图像张量被表示成了一个二维矩形,矩形的高度是图像的宽高,矩形的宽度是通道数。U-Net的前半部分和常见的CNN一样,缩小图像大小,增大图像通道数。而在后半部分中,每次上采样时,一半的通道来自上一层的输出,另一半的通道来自于网络前半部分。

从图中能看出,U-Net的结构图是一个“U”型,因此它才被叫做U-Net。

总结

在这篇文章中,我们主要学习了以下内容:

  • 任务定义与输出格式
    • 目标定位
    • 关键点检测
    • 目标检测
    • 语义分割
  • YOLO
    • 用卷积实现全连接
    • 用卷积实现滑动窗口
    • 锚框
    • IoU
    • NMS
    • YOLO算法
  • U-Net
    • 反卷积
    • U-Net架构

这周的代码实战中,我会详细讲解NMS的实现。时间允许的话,我还会展示一下如何在COCO上训练YOLO。