from taming.modules.diffusionmodules.model import Encoder, Decoder from taming.modules.vqvae.quantize import VectorQuantizer2 as VectorQuantizer from taming.modules.vqvae.quantize import GumbelQuantize from taming.modules.vqvae.quantize import EMAVectorQuantizer
defforward(self, z): z = z.permute(0, 2, 3, 1).contiguous() z_flattened = z.view(-1, self.e_dim) # distances from z to embeddings e_j (z - e)^2 = z^2 + e^2 - 2 e * z
res = [spatial_average(lins[kk].model(diffs[kk]), keepdim=True) for kk inrange(len(self.chns))] val = res[0] for l inrange(1, len(self.chns)): val += res[l] return val
classNLayerDiscriminator(nn.Module): """Defines a PatchGAN discriminator as in Pix2Pix --> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py """ def__init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): """Construct a PatchGAN discriminator Parameters: input_nc (int) -- the number of channels in input images ndf (int) -- the number of filters in the last conv layer n_layers (int) -- the number of conv layers in the discriminator norm_layer -- normalization layer """ super(NLayerDiscriminator, self).__init__() ifnot use_actnorm: norm_layer = nn.BatchNorm2d else: norm_layer = ActNorm iftype(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters use_bias = norm_layer.func != nn.BatchNorm2d else: use_bias = norm_layer != nn.BatchNorm2d
kw = 4 padw = 1 sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] nf_mult = 1 nf_mult_prev = 1 for n inrange(1, n_layers): # gradually increase the number of filters nf_mult_prev = nf_mult nf_mult = min(2 ** n, 8) sequence += [ nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), norm_layer(ndf * nf_mult), nn.LeakyReLU(0.2, True) ]
definit_first_stage_from_ckpt(self, config): model = instantiate_from_config(config) model = model.eval() model.train = disabled_train self.first_stage_model = model
# target includes all sequence elements (no need to handle first one # differently because we are conditioning) target = z_indices # make the prediction logits, _ = self.transformer(cz_indices[:, :-1]) # cut off conditioning outputs - output i corresponds to p(z_i | z_{<i}, c) logits = logits[:, c_indices.shape[1]-1:]
defforward(self, idx, embeddings=None, targets=None): # forward the GPT model token_embeddings = self.tok_emb(idx) # each index maps to a (learnable) vector
t = token_embeddings.shape[1] assert t <= self.block_size, "Cannot forward, model block size is exhausted." position_embeddings = self.pos_emb[:, :t, :] # each position maps to a (learnable) vector x = self.drop(token_embeddings + position_embeddings) x = self.blocks(x) x = self.ln_f(x) logits = self.head(x)
# if we are given some desired targets also calculate the loss loss = None if targets isnotNone: loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
defforward(self, x, layer_past=None, return_present=False): # TODO: check that training still works if return_present: assertnot self.training # layer past: tuple of length two with B, nh, T, hs attn, present = self.attn(self.ln1(x), layer_past=layer_past)
x = x + attn x = x + self.mlp(self.ln2(x)) if layer_past isnotNoneor return_present: return x, present return x
目前 粮食 出现 阶段性 过剩 , 恰好 可以 以 粮食 换 森林 、 换 草地 , 再造 西部 秀美 山川 。 the present food surplus can specifically serve the purpose of helping western china restore its woodlands , grasslands , and the beauty of its landscapes .
defforward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None): # batch should be same assert q.shape[0] == k.shape[0] assert q.shape[0] == v.shape[0] # the sequence length of k and v should be aligned assert k.shape[1] == v.shape[1]
defforward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None): # batch should be same assert q.shape[0] == k.shape[0] assert q.shape[0] == v.shape[0] # the sequence length of k and v should be aligned assert k.shape[1] == v.shape[1]
defforward(self, x, src_mask: Optional[torch.Tensor] = None): x = self.embedding(x) x = self.pe(x) x = self.dropout(x) for layer in self.layers: x = layer(x, src_mask) return x
defforward(self, x, encoder_kv, dst_mask: Optional[torch.Tensor] = None, src_dst_mask: Optional[torch.Tensor] = None): x = self.embedding(x) x = self.pe(x) x = self.dropout(x) for layer in self.layers: x = layer(x, encoder_kv, dst_mask, src_dst_mask) return x
# y_input = y_batch with torch.no_grad(): for i inrange(1, y_input.shape[1]): y_hat = model(x_batch, y_input) for j inrange(batch_size): y_input[j, i] = torch.argmax(y_hat[j, i - 1])
tokenizer = get_tokenizer('basic_english') english = tokenizer(english)
而中文分词方面,我使用了jieba库。该库可以直接 pip 安装。
1
pip install jieba
分词的 API 是 jieba.cut。由于分词的结果中,相邻的词之间有空格,我一股脑地把所有空白符给过滤掉了。
1 2 3
import jieba chinese = list(jieba.cut(chinese)) chinese = [x for x in chinese if x notin {' ', '\t'}]
经过这些处理后,每句话被转换成了中文词语或英文单词的数组。整个处理代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
defread_file(json_path): english_sentences = [] chinese_sentences = [] tokenizer = get_tokenizer('basic_english') withopen(json_path, 'r') as fp: for line in fp: line = json.loads(line) english, chinese = line['english'], line['chinese'] # Correct mislabeled data ifnot english.isascii(): english, chinese = chinese, english # Tokenize english = tokenizer(english) chinese = list(jieba.cut(chinese)) chinese = [x for x in chinese if x notin {' ', '\t'}] english_sentences.append(english) chinese_sentences.append(chinese) return english_sentences, chinese_sentences
def__getitem__(self, index): x = np.concatenate(([SOS_ID], self.en_tensor[index], [EOS_ID])) x = torch.from_numpy(x) y = np.concatenate(([SOS_ID], self.zh_tensor[index], [EOS_ID])) y = torch.from_numpy(y) return x, y
Conditional Image Generation with PixelCNN Decoders 是提出Gated PixelCNN的文章。可以主要阅读消除视野盲区和门激活函数的部分。
PixelCNN++: Improving the PixelCNN with Discretized Logistic Mixture Likelihood and Other Modifications 是提出PixelCNN++的文章。整篇文章非常简练,可以整体阅读一遍,并且着重阅读离散logistic混合似然的部分。不过,这篇文章有很多地方写得过于简单了,连公式里的字母都不好好交代清楚,我还是看代码才看懂他们想讲什么。建议搭配本文的讲解阅读。
defforward(self, x): y = self.relu(x) y = self.conv1(y) y = self.bn1(y) y = self.relu(y) y = self.conv2(y) y = self.bn2(y) y = self.relu(y) y = self.conv3(y) y = self.bn3(y) y = y + x return y
defforward(self, x): x = self.conv1(x) x = self.bn1(x) for block in self.residual_blocks: x = block(x) x = self.relu(x) x = self.linear1(x) x = self.relu(x) x = self.linear2(x) x = self.out(x) return x
h = self.h_conv(h_input) h = self.bn3(h) h = h + v_to_h h1, h2 = h[:, :self.p], h[:, self.p:] h1 = torch.tanh(h1) h2 = torch.sigmoid(h2) h = h1 * h2 h = self.h_output_conv(h) h = self.bn4(h) if self.conv_type == 'B': h = h + h_input return v, h
defforward(self, x): v, h = self.block1(x, x) for block in self.blocks: v, h = block(v, h) x = self.relu(h) x = self.linear1(x) x = self.relu(x) x = self.linear2(x) x = self.out(x) return x
u_list = [nn.down_shift( nn.down_shifted_conv2d(x_pad, num_filters=nr_filters, filter_size=[2, 3]) )] # stream for pixels above ul_list = [nn.down_shift( nn.down_shifted_conv2d(x_pad, num_filters=nr_filters, filter_size=[1,3]) ) + nn.right_shift( nn.down_right_shifted_conv2d(x_pad, num_filters=nr_filters, filter_size=[2,1]) )] # stream for up and to the left
u = u_list.pop() ul = ul_list.pop() for rep inrange(nr_resnet): u = nn.gated_resnet(u, u_list.pop(), conv=nn.down_shifted_conv2d) ul = nn.gated_resnet(ul, tf.concat([u, ul_list.pop()],3), conv=nn.down_right_shifted_conv2d) tf.add_to_collection('checkpoints', u) tf.add_to_collection('checkpoints', ul)
u = nn.down_shifted_deconv2d(u, num_filters=nr_filters, stride=[2, 2]) ul = nn.down_right_shifted_deconv2d(ul, num_filters=nr_filters, stride=[2, 2])
for rep inrange(nr_resnet+1): u = nn.gated_resnet(u, u_list.pop(), conv=nn.down_shifted_conv2d) ul = nn.gated_resnet(ul, tf.concat([u, ul_list.pop()],3), conv=nn.down_right_shifted_conv2d) tf.add_to_collection('checkpoints', u) tf.add_to_collection('checkpoints', ul)
u = nn.down_shifted_deconv2d(u, num_filters=nr_filters, stride=[2, 2]) ul = nn.down_right_shifted_deconv2d(ul, num_filters=nr_filters, stride=[2, 2])
for rep inrange(nr_resnet+1): u = nn.gated_resnet(u, u_list.pop(), conv=nn.down_shifted_conv2d) ul = nn.gated_resnet(ul, tf.concat([u, ul_list.pop()],3), conv=nn.down_right_shifted_conv2d) tf.add_to_collection('checkpoints', u) tf.add_to_collection('checkpoints', ul)
defdiscretized_mix_logistic_loss(x,l,sum_all=True): """ log-likelihood for mixture of discretized logistics, assumes the data has been rescaled to [-1,1] interval """ xs = int_shape(x) # true image (i.e. labels) to regress to, e.g. (B,32,32,3) ls = int_shape(l) # predicted distribution, e.g. (B,32,32,100) nr_mix = int(ls[-1] / 10) # here and below: unpacking the params of the mixture of logistics logit_probs = l[:,:,:,:nr_mix] l = tf.reshape(l[:,:,:,nr_mix:], xs + [nr_mix*3]) means = l[:,:,:,:,:nr_mix] log_scales = tf.maximum(l[:,:,:,:,nr_mix:2*nr_mix], -7.) coeffs = tf.nn.tanh(l[:,:,:,:,2*nr_mix:3*nr_mix]) x = tf.reshape(x, xs + [1]) + tf.zeros(xs + [nr_mix]) # here and below: getting the means and adjusting them based on preceding sub-pixels m2 = tf.reshape(means[:,:,:,1,:] + coeffs[:, :, :, 0, :] * x[:, :, :, 0, :], [xs[0],xs[1],xs[2],1,nr_mix]) m3 = tf.reshape(means[:, :, :, 2, :] + coeffs[:, :, :, 1, :] * x[:, :, :, 0, :] + coeffs[:, :, :, 2, :] * x[:, :, :, 1, :], [xs[0],xs[1],xs[2],1,nr_mix]) means = tf.concat([tf.reshape(means[:,:,:,0,:], [xs[0],xs[1],xs[2],1,nr_mix]), m2, m3],3) centered_x = x - means inv_stdv = tf.exp(-log_scales) plus_in = inv_stdv * (centered_x + 1./255.) cdf_plus = tf.nn.sigmoid(plus_in) min_in = inv_stdv * (centered_x - 1./255.) cdf_min = tf.nn.sigmoid(min_in) log_cdf_plus = plus_in - tf.nn.softplus(plus_in) # log probability for edge case of 0 (before scaling) log_one_minus_cdf_min = -tf.nn.softplus(min_in) # log probability for edge case of 255 (before scaling) cdf_delta = cdf_plus - cdf_min # probability for all other cases mid_in = inv_stdv * centered_x log_pdf_mid = mid_in - log_scales - 2.*tf.nn.softplus(mid_in) # log probability in the center of the bin, to be used in extreme cases (not actually used in our code)
xs = int_shape(x) # true image (i.e. labels) to regress to, e.g. (B,32,32,3) ls = int_shape(l) # predicted distribution, e.g. (B,32,32,100) nr_mix = int(ls[-1] / 10) # here and below: unpacking the params of the mixture of logistics logit_probs = l[:,:,:,:nr_mix] l = tf.reshape(l[:,:,:,nr_mix:], xs + [nr_mix*3]) means = l[:,:,:,:,:nr_mix] log_scales = tf.maximum(l[:,:,:,:,nr_mix:2*nr_mix], -7.) coeffs = tf.nn.tanh(l[:,:,:,:,2*nr_mix:3*nr_mix])
centered_x = x - means inv_stdv = tf.exp(-log_scales) plus_in = inv_stdv * (centered_x + 1./255.) cdf_plus = tf.nn.sigmoid(plus_in) min_in = inv_stdv * (centered_x - 1./255.) cdf_min = tf.nn.sigmoid(min_in) log_cdf_plus = plus_in - tf.nn.softplus(plus_in) # log probability for edge case of 0 (before scaling) log_one_minus_cdf_min = -tf.nn.softplus(min_in) # log probability for edge case of 255 (before scaling) cdf_delta = cdf_plus - cdf_min # probability for all other cases
作者还算了积分区间中心的概率,以处理某些边界情况。实际上这个值没有在代码中使用。
1 2 3
mid_in = inv_stdv * centered_x log_pdf_mid = mid_in - log_scales - 2.*tf.nn.softplus(mid_in) # log probability in the center of the bin, to be used in extreme cases (not actually used in our code)
# Numba 循环写法 @njit defcal_distance(x, y, x2, y2, A_padding, B, p): sum = 0 for i inrange(p + p + 1): for j inrange(p + p + 1): for k inrange(3): a = float(A_padding[x + i, y + j, k]) bb = B[x2 - p + i, y2 - p + j, k] sum += (a - bb)**2 returnsum
为了达成这个目的,研究者提出了不同的学习特征的方法。最直观的方式是像学习词嵌入一样,用一个具体的任务来学习特征提取。恰好,人脸识别可以天然地被当成一个多分类任务:对于一个有 N 个人的人脸训练集,人脸识别就是一个 N 分类任务。只要在特征提取后面加一个线性层和一个 softmax 就可以做多分类了。训练好多分类器后,扔掉线性层和 softmax 层,就得到了一个特征提取器。
import os import torch import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from torch.utils.data.distributed import DistributedSampler from torch.nn.parallel import DistributedDataParallel
for epoch inrange(2): sampler.set_epoch(epoch) for x in dataloader: print(f'epoch {epoch}, rank {rank} data: {x}') x = x.to(device_id) y = ddp_model(x) optimizer.zero_grad() loss = loss_fn(x, y) loss.backward() optimizer.step()
for epoch inrange(2): sampler.set_epoch(epoch) for x in dataloader: print(f'epoch {epoch}, rank {rank} data: {x}') x = x.to(device_id) y = ddp_model(x) optimizer.zero_grad() loss = loss_fn(x, y) loss.backward() optimizer.step()
classVAE(nn.Module): ''' VAE for 64x64 face generation. The hidden dimensions can be tuned. ''' def__init__(self, hiddens=[16, 32, 64, 128, 256], latent_dim=128) -> None: super().__init__()