accelerator = Accelerator( gradient_accumulation_steps=args.gradient_accumulation_steps, mixed_precision=args.mixed_precision, log_with=args.report_to, project_config=accelerator_project_config, ) if args.report_to == "wandb": ifnot is_wandb_available(): raise ImportError("Make sure to install wandb if you want to use it for logging during training.") import wandb
# Make one log on every process with the configuration for debugging. logging.basicConfig( format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%m/%d/%Y %H:%M:%S", level=logging.INFO, ) logger.info(accelerator.state, main_process_only=False) if accelerator.is_local_main_process: datasets.utils.logging.set_verbosity_warning() transformers.utils.logging.set_verbosity_warning() diffusers.utils.logging.set_verbosity_info() else: datasets.utils.logging.set_verbosity_error() transformers.utils.logging.set_verbosity_error() diffusers.utils.logging.set_verbosity_error()
随后的代码决定是否手动设置随机种子。保持默认即可。
1 2 3
# If passed along, set the training seed now. if args.seed isnotNone: set_seed(args.seed)
# freeze parameters of models to save more memory unet.requires_grad_(False) vae.requires_grad_(False) text_encoder.requires_grad_(False)
# Freeze the unet parameters before adding adapters for param in unet.parameters(): param.requires_grad_(False)
# For mixed precision training we cast all non-trainable weigths (vae, non-lora text_encoder and non-lora unet) to half-precision # as these weights are only used for inference, keeping weights in full precision is not required. weight_dtype = torch.float32 if accelerator.mixed_precision == "fp16": weight_dtype = torch.float16 elif accelerator.mixed_precision == "bf16": weight_dtype = torch.bfloat16
# Move unet, vae and text_encoder to device and cast to weight_dtype unet.to(accelerator.device, dtype=weight_dtype) vae.to(accelerator.device, dtype=weight_dtype) text_encoder.to(accelerator.device, dtype=weight_dtype)
unet.add_adapter(unet_lora_config) if args.mixed_precision == "fp16": for param in unet.parameters(): # only upcast trainable parameters (LoRA) into fp32 if param.requires_grad: param.data = param.to(torch.float32)
if args.enable_xformers_memory_efficient_attention: if is_xformers_available(): import xformers
xformers_version = version.parse(xformers.__version__) if xformers_version == version.parse("0.0.16"): logger.warn( ... ) unet.enable_xformers_memory_efficient_attention() else: raise ValueError("xformers is not available. Make sure it is installed correctly")
if args.gradient_checkpointing: unet.enable_gradient_checkpointing()
# Enable TF32 for faster training on Ampere GPUs, # cf https://pytorch.org/docs/stable/notes/cuda.html#tensorfloat-32-tf32-on-ampere-devices if args.allow_tf32: torch.backends.cuda.matmul.allow_tf32 = True
然后是优化器的选择。我们可以忽略其他逻辑,直接用 AdamW。
1 2 3 4 5 6 7 8 9 10 11 12
# Initialize the optimizer if args.use_8bit_adam: try: import bitsandbytes as bnb except ImportError: raise ImportError( "..." )
if args.dataset_name isnotNone: # Downloading and loading a dataset from the hub. dataset = load_dataset( args.dataset_name, args.dataset_config_name, cache_dir=args.cache_dir, data_dir=args.train_data_dir, ) else: data_files = {} if args.train_data_dir isnotNone: data_files["train"] = os.path.join(args.train_data_dir, "**") dataset = load_dataset( "imagefolder", data_files=data_files, cache_dir=args.cache_dir, ) # See more about loading custom images at # https://huggingface.co/docs/datasets/v2.4.0/en/image_load#imagefolder
# Preprocessing the datasets. # We need to tokenize inputs and targets. column_names = dataset["train"].column_names
# 6. Get the column names for input/target. dataset_columns = DATASET_NAME_MAPPING.get(args.dataset_name, None) if args.image_column isNone: image_column = dataset_columns[0] if dataset_columns isnotNoneelse column_names[0] else: image_column = args.image_column if image_column notin column_names: raise ValueError( f"--image_column' value '{args.image_column}' needs to be one of: {', '.join(column_names)}" ) if args.caption_column isNone: caption_column = dataset_columns[1] if dataset_columns isnotNoneelse column_names[1] else: caption_column = args.caption_column if caption_column notin column_names: raise ValueError( f"--caption_column' value '{args.caption_column}' needs to be one of: {', '.join(column_names)}" )
准备好了数据集,接下来要定义数据预处理流程以创建 DataLoader。函数先定义了一个把文本标签预处理成 token ID 的 token 化函数。我们不需要修改它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
deftokenize_captions(examples, is_train=True): captions = [] for caption in examples[caption_column]: ifisinstance(caption, str): captions.append(caption) elifisinstance(caption, (list, np.ndarray)): # take a random caption if there are multiple captions.append(random.choice(caption) if is_train else caption[0]) else: raise ValueError( f"Caption column `{caption_column}` should contain either strings or lists of strings." ) inputs = tokenizer( captions, max_length=tokenizer.model_max_length, padding="max_length", truncation=True, return_tensors="pt" ) return inputs.input_ids
# Preprocessing the datasets. train_transforms = transforms.Compose( [ transforms.Resize( args.resolution, interpolation=transforms.InterpolationMode.BILINEAR), transforms.CenterCrop( args.resolution) if args.center_crop else transforms.RandomCrop(args.resolution), transforms.RandomHorizontalFlip() if args.random_flip else transforms.Lambda(lambda x: x), transforms.ToTensor(), transforms.Normalize([0.5], [0.5]), ] )
定义了预处理流程后,函数对所有数据进行预处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
defpreprocess_train(examples): images = [image.convert("RGB") for image in examples[image_column]] examples["pixel_values"] = [ train_transforms(image) for image in images] examples["input_ids"] = tokenize_captions(examples) return examples
with accelerator.main_process_first(): if args.max_train_samples isnotNone: dataset["train"] = dataset["train"].shuffle( seed=args.seed).select(range(args.max_train_samples)) # Set the training transforms train_dataset = dataset["train"].with_transform(preprocess_train)
defcollate_fn(examples): pixel_values = torch.stack([example["pixel_values"] for example in examples]) pixel_values = pixel_values.to( memory_format=torch.contiguous_format).float() input_ids = torch.stack([example["input_ids"] for example in examples]) return {"pixel_values": pixel_values, "input_ids": input_ids}
# Scheduler and math around the number of training steps. overrode_max_train_steps = False num_update_steps_per_epoch = math.ceil( len(train_dataloader) / args.gradient_accumulation_steps) if args.max_train_steps isNone: args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch overrode_max_train_steps = True
# We need to recalculate our total training steps as the size of the training dataloader may have changed. num_update_steps_per_epoch = math.ceil( len(train_dataloader) / args.gradient_accumulation_steps) if overrode_max_train_steps: args.max_train_steps = args.num_train_epochs * num_update_steps_per_epoch # Afterwards we recalculate our number of training epochs args.num_train_epochs = math.ceil( args.max_train_steps / num_update_steps_per_epoch)
在准备工作的最后,函数会用 accelerate 库记录配置信息。
1 2
if accelerator.is_main_process: accelerator.init_trackers("text2image-fine-tune", config=vars(args))
# Potentially load in the weights and states from a previous save if args.resume_from_checkpoint: if args.resume_from_checkpoint != "latest": path = ... else: # Get the most recent checkpoint path = ...
progress_bar = tqdm( range(0, args.max_train_steps), initial=initial_global_step, desc="Steps", # Only show the progress bar once on each machine. disable=not accelerator.is_local_main_process, )
for epoch inrange(first_epoch, args.num_train_epochs): unet.train() train_loss = 0.0 for step, batch inenumerate(train_dataloader): with accelerator.accumulate(unet):
bsz = latents.shape[0] # Sample a random timestep for each image timesteps = torch.randint( 0, noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device) timesteps = timesteps.long()
时间戳和前面随机生成的噪声一起经 DDPM 的前向过程得到带噪图片 noisy_latents。
1 2 3 4
# Add noise to the latents according to the noise magnitude at each timestep # (this is the forward diffusion process) noisy_latents = noise_scheduler.add_noise( latents, noise, timesteps)
再把文本 batch["input_ids"] 编码,为之后的 U-Net 前向传播做准备。
1 2
# Get the text embedding for conditioning encoder_hidden_states = text_encoder(batch["input_ids"])[0]
# Get the target for loss depending on the prediction type if args.prediction_type isnotNone: # set prediction_type of scheduler if defined noise_scheduler.register_to_config( prediction_type=args.prediction_type)
if args.snr_gamma isNone: loss = F.mse_loss(model_pred.float(), target.float(), reduction="mean") else: # Compute loss-weights as per Section 3.4 of https://arxiv.org/abs/2303.09556. ...
if global_step % args.checkpointing_steps == 0: if accelerator.is_main_process: # _before_ saving state, check if this save would set us over the `checkpoints_total_limit` if args.checkpoints_total_limit isnotNone: checkpoints = ...
import os import torch import numpy as np from omegaconf import OmegaConf from PIL import Image from tqdm import tqdm, trange from einops import rearrange from pytorch_lightning import seed_everything from torch import autocast from torchvision.utils import make_grid
from ldm.util import instantiate_from_config from ldm.models.diffusion.ddim import DDIMSampler
defload_model_from_config(config, ckpt, verbose=False): print(f"Loading model from {ckpt}") pl_sd = torch.load(ckpt, map_location="cpu") if"global_step"in pl_sd: print(f"Global Step: {pl_sd['global_step']}") sd = pl_sd["state_dict"] model = instantiate_from_config(config.model) m, u = model.load_state_dict(sd, strict=False) iflen(m) > 0and verbose: print("missing keys:") print(m) iflen(u) > 0and verbose: print("unexpected keys:") print(u)
model.cuda() model.eval() return model
defmain(): seed = 42 config = 'configs/stable-diffusion/v1-inference.yaml' ckpt = 'ckpt/v1-5-pruned.ckpt' outdir = 'tmp' n_samples = batch_size = 3 n_rows = batch_size n_iter = 2 prompt = 'a photograph of an astronaut riding a horse' data = [batch_size * [prompt]] scale = 7.5 C = 4 f = 8 H = W = 512 ddim_steps = 50 ddim_eta = 0.0
seed_everything(seed)
config = OmegaConf.load(config) model = load_model_from_config(config, ckpt)
device = torch.device( "cuda") if torch.cuda.is_available() else torch.device("cpu") model = model.to(device) sampler = DDIMSampler(model)
all_samples.append(x_samples_ddim) grid = torch.stack(all_samples, 0) grid = rearrange(grid, 'n b c h w -> (n b) c h w') grid = make_grid(grid, nrow=n_rows)
# to image grid = 255. * rearrange(grid, 'c h w -> h w c').cpu().numpy() img = Image.fromarray(grid.astype(np.uint8)) img.save(os.path.join(outpath, f'grid-{grid_count:04}.png')) grid_count += 1
print(f"Your samples are ready and waiting for you here: \n{outpath} \n" f" \nEnjoy.")
self.make_schedule(ddim_num_steps=S, ddim_eta=eta, verbose=verbose) # sampling C, H, W = shape size = (batch_size, C, H, W) print(f'Data shape for DDIM sampling is {size}, eta {eta}')
h = x.type(self.dtype) for module in self.input_blocks: h = module(h, emb, context) hs.append(h) h = self.middle_block(h, emb, context) for module in self.output_blocks: h = th.cat([h, hs.pop()], dim=1) h = module(h, emb, context) h = h.type(x.dtype) return self.out(h)
defforward(self, x, emb, context=None): for layer in self: ifisinstance(layer, TimestepBlock): x = layer(x, emb) elifisinstance(layer, SpatialTransformer): x = layer(x, context) else: x = layer(x) return x
for level, mult inenumerate(channel_mult): for _ inrange(num_res_blocks): layers = [ ResBlock(...)] ch = mult * model_channels if ds in attention_resolutions: layers.append( AttentionBlock(...) ifnot use_spatial_transformer else SpatialTransformer(...))
self.input_blocks.append(TimestepEmbedSequential(*layers)) if level != len(channel_mult) - 1: out_ch = ch self.input_blocks.append( TimestepEmbedSequential( ResBlock(...) if resblock_updown else Downsample(...) ) )
当然,标准Transformer是针对一维序列数据的。要把Transformer用到图像上,则需要把图像的宽高拼接到同一维,即对张量做形状变换n c h w -> n c (h * w)。做完这个变换后,就可以把数据直接输入进Transformer模块了。 这些图像数据与序列数据的适配都是在SpatialTransformer类里完成的。SpatialTransformer类并没有直接实现Transformer块的细节,仅仅是U-Net和Transformer块之间的一个过渡。Transformer块的实现在它的一个子模块里。我们来看它的实现代码。
defforward(self, x, context=None): b, c, h, w = x.shape x_in = x x = self.norm(x) x = self.proj_in(x) x = rearrange(x, 'b c h w -> b (h w) c') for block in self.transformer_blocks: x = block(x, context=context) x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w) x = self.proj_out(x) return x + x_in
defforward(self, x, context=None): x = self.attn1(self.norm1(x)) + x x = self.attn2(self.norm2(x), context=context) + x x = self.ff(self.norm3(x)) + x return x
for level, mult inenumerate(channel_mult): for _ inrange(num_res_blocks): layers = [ ResBlock(...)] ch = mult * model_channels if ds in attention_resolutions: layers.append( AttentionBlock(...) ifnot use_spatial_transformer else SpatialTransformer(...))
self.input_blocks.append(TimestepEmbedSequential(*layers)) if level != len(channel_mult) - 1: out_ch = ch self.input_blocks.append( TimestepEmbedSequential( ResBlock(...) if resblock_updown else Downsample(...) ) )
from diffusers import DiffusionPipeline import torch
pipeline = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16) pipeline.to("cuda") pipeline("An image of a squirrel in Picasso style").images[0].save('output.jpg')
from diffusers import DiffusionPipeline import torch
pipeline = DiffusionPipeline.from_pretrained("ckpt/sd15", torch_dtype=torch.float16) pipeline.to("cuda") pipeline("An image of a squirrel in Picasso style").images[0].save('output.jpg')
# 0. Default height and width to unet height = height or self.unet.config.sample_size * self.vae_scale_factor width = width or self.unet.config.sample_size * self.vae_scale_factor # to deal with lora scaling and other possible forward hooks
# 1. Check inputs. Raise error if not correct self.check_inputs(...)
# For classifier free guidance, we need to do two forward passes. # Here we concatenate the unconditional and text embeddings into a single batch # to avoid doing two forward passes if self.do_classifier_free_guidance: prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])
# 6. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipeline ...
# 7. Denoising loop num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order self._num_timesteps = len(timesteps) with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t inenumerate(timesteps): # expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
if self.do_classifier_free_guidance and self.guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=self.guidance_rescale)
# call the callback, if provided if i == len(timesteps) - 1or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): progress_bar.update()
with self.progress_bar(total=num_inference_steps) as progress_bar: for i, t inenumerate(timesteps): # eps = unet(zt, t, c)
# expand the latents if we are doing classifier free guidance latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
if self.do_classifier_free_guidance and self.guidance_rescale > 0.0: # Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=self.guidance_rescale)
classCrossAttnDownBlock2D(nn.Module): def__init__(...): for i inrange(num_layers): resnets.append(ResnetBlock2D(...)) ifnot dual_cross_attention: attentions.append(Transformer2DModel(...))
attn_processor_dict = {} for k in unet.attn_processors.keys(): if we_want_to_modify(k): attn_processor_dict[k] = MyAttnProcessor() else: attn_processor_dict[k] = AttnProcessor()
在介绍具体结果之前,先对这个不太常见的精确率及召回率指标做一个解释。精确率及召回率常用于分类等有确定答案的任务中,分别表示所有被分类为正的样本中有多少是分对了的、所有真值为正的样本中有多少是被成功分类成正的。而无约束图像生成中的精确率及召回率的解释可以参加论文Improved Precision and Recall Metric for Assessing Generative Models。如下图所示,设真实分布为蓝色,生成模型的分布为红色,则红色样本落在蓝色分布的比例为精确率,蓝色样本落在红色分布的比例为召回率。简单来说,精确率能描述采样质量,召回率能描述生成分布与真实分布的覆盖情况。
Deep Unsupervised Learning using Nonequilibrium Thermodynamics: https://arxiv.org/abs/1503.03585 DDPM的前作,首个提出扩散模型思想的文章。其核心原理和DDPM几乎完全一致,但是模型结构和优化目标不够先进,生成效果没有改进后的DDPM好。数学公式较多,不必细读,可以在学习DDPM时对比着阅读。
defsample_backward(self, img_or_shape, net, device, simple_var=True): ifisinstance(img_or_shape, torch.Tensor): x = img_or_shape else: x = torch.randn(img_or_shape).to(device) net = net.to(device) for t in tqdm(range(self.n_steps - 1, -1, -1), "DDPM sampling"): x = self.sample_backward_step(x, t, net, simple_var)
img_list = einops.rearrange(imgs, 'n c h w -> n h w c').numpy() output_dir = os.path.splitext(output_path)[0] os.makedirs(output_dir, exist_ok=True) for i, img inenumerate(img_list): if to_bgr: img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) cv2.imwrite(f'{output_dir}/{i+index}.jpg', img)
# First iteration if index == 0: imgs = einops.rearrange(imgs, '(b1 b2) c h w -> (b1 h) (b2 w) c', b1=int(batch_size**0.5)) imgs = imgs.numpy() if to_bgr: imgs = cv2.cvtColor(imgs, cv2.COLOR_RGB2BGR) cv2.imwrite(output_path, imgs)
defsample_backward(self, img_shape, net, device, simple_var=True): x = torch.randn(img_shape).to(device) net = net.to(device) for t inrange(self.n_steps - 1, -1, -1): x = self.sample_backward_step(x, t, net, simple_var) return x
defsample_backward_step(self, x_t, t, net, simple_var=True): n = x_t.shape[0] t_tensor = torch.tensor([t] * n, dtype=torch.long).to(x_t.device).unsqueeze(1) eps = net(x_t, t_tensor)
if t == 0: noise = 0 else: if simple_var: var = self.betas[t] else: var = (1 - self.alpha_bars[t - 1]) / ( 1 - self.alpha_bars[t]) * self.betas[t] noise = torch.randn_like(x_t) noise *= torch.sqrt(var)
mean = (x_t - (1 - self.alphas[t]) / torch.sqrt(1 - self.alpha_bars[t]) * eps) / torch.sqrt(self.alphas[t]) x_t = mean + noise
defsample_backward(self, img_shape, net, device, simple_var=True): x = torch.randn(img_shape).to(device) net = net.to(device) for t inrange(self.n_steps - 1, -1, -1): x = self.sample_backward_step(x, t, net, simple_var) return x
import torch import torch.nn as nn from dldemos.ddpm.dataset import get_dataloader, get_img_shape from dldemos.ddpm.ddpm import DDPM import cv2 import numpy as np import einops
batch_size = 512 n_epochs = 100
deftrain(ddpm: DDPM, net, device, ckpt_path): # n_steps 就是公式里的 T # net 是某个继承自 torch.nn.Module 的神经网络 n_steps = ddpm.n_steps dataloader = get_dataloader(batch_size) net = net.to(device) loss_fn = nn.MSELoss() optimizer = torch.optim.Adam(net.parameters(), 1e-3)
for e inrange(n_epochs): for x, _ in dataloader: current_batch_size = x.shape[0] x = x.to(device) t = torch.randint(0, n_steps, (current_batch_size, )).to(device) eps = torch.randn_like(x).to(device) x_t = ddpm.sample_forward(x, t, eps) eps_theta = net(x_t, t.reshape(current_batch_size, 1)) loss = loss_fn(eps_theta, eps) optimizer.zero_grad() loss.backward() optimizer.step() torch.save(net.state_dict(), ckpt_path)
for x, _ in dataloader: current_batch_size = x.shape[0] x = x.to(device) t = torch.randint(0, n_steps, (current_batch_size, )).to(device) eps = torch.randn_like(x).to(device)
defforward(self, input): x = self.conv1(input) x = self.bn1(x) x = self.actvation1(x) x = self.conv2(x) x = self.bn2(x) x += self.shortcut(input) x = self.actvation2(x) return x
self.residual_blocks = nn.ModuleList() prev_channel = C for channel in intermediate_channels: self.residual_blocks.append(ResidualBlock(prev_channel, channel)) if insert_t_to_all_layers: self.pe_linears.append(nn.Linear(pe_dim, prev_channel)) else: self.pe_linears.append(None) prev_channel = channel self.output_layer = nn.Conv2d(prev_channel, C, 3, 1, 1)
defforward(self, x, t): n = t.shape[0] t = self.pe(t) for m_x, m_t inzip(self.residual_blocks, self.pe_linears): if m_t isnotNone: pe = m_t(t).reshape(n, -1, 1, 1) x = x + pe x = m_x(x) x = self.output_layer(x) return x
defforward(self, x): out = self.ln(x) out = self.conv1(out) out = self.activation(out) out = self.conv2(out) ifself.residual: out += self.residual_conv(x) out = self.activation(out) return out
from taming.modules.diffusionmodules.model import Encoder, Decoder from taming.modules.vqvae.quantize import VectorQuantizer2 as VectorQuantizer from taming.modules.vqvae.quantize import GumbelQuantize from taming.modules.vqvae.quantize import EMAVectorQuantizer
defforward(self, z): z = z.permute(0, 2, 3, 1).contiguous() z_flattened = z.view(-1, self.e_dim) # distances from z to embeddings e_j (z - e)^2 = z^2 + e^2 - 2 e * z
res = [spatial_average(lins[kk].model(diffs[kk]), keepdim=True) for kk inrange(len(self.chns))] val = res[0] for l inrange(1, len(self.chns)): val += res[l] return val
classNLayerDiscriminator(nn.Module): """Defines a PatchGAN discriminator as in Pix2Pix --> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py """ def__init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): """Construct a PatchGAN discriminator Parameters: input_nc (int) -- the number of channels in input images ndf (int) -- the number of filters in the last conv layer n_layers (int) -- the number of conv layers in the discriminator norm_layer -- normalization layer """ super(NLayerDiscriminator, self).__init__() ifnot use_actnorm: norm_layer = nn.BatchNorm2d else: norm_layer = ActNorm iftype(norm_layer) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters use_bias = norm_layer.func != nn.BatchNorm2d else: use_bias = norm_layer != nn.BatchNorm2d
kw = 4 padw = 1 sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)] nf_mult = 1 nf_mult_prev = 1 for n inrange(1, n_layers): # gradually increase the number of filters nf_mult_prev = nf_mult nf_mult = min(2 ** n, 8) sequence += [ nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias), norm_layer(ndf * nf_mult), nn.LeakyReLU(0.2, True) ]
definit_first_stage_from_ckpt(self, config): model = instantiate_from_config(config) model = model.eval() model.train = disabled_train self.first_stage_model = model
# target includes all sequence elements (no need to handle first one # differently because we are conditioning) target = z_indices # make the prediction logits, _ = self.transformer(cz_indices[:, :-1]) # cut off conditioning outputs - output i corresponds to p(z_i | z_{<i}, c) logits = logits[:, c_indices.shape[1]-1:]
defforward(self, idx, embeddings=None, targets=None): # forward the GPT model token_embeddings = self.tok_emb(idx) # each index maps to a (learnable) vector
t = token_embeddings.shape[1] assert t <= self.block_size, "Cannot forward, model block size is exhausted." position_embeddings = self.pos_emb[:, :t, :] # each position maps to a (learnable) vector x = self.drop(token_embeddings + position_embeddings) x = self.blocks(x) x = self.ln_f(x) logits = self.head(x)
# if we are given some desired targets also calculate the loss loss = None if targets isnotNone: loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1))
defforward(self, x, layer_past=None, return_present=False): # TODO: check that training still works if return_present: assertnotself.training # layer past: tuple of length two with B, nh, T, hs attn, present = self.attn(self.ln1(x), layer_past=layer_past)
x = x + attn x = x + self.mlp(self.ln2(x)) if layer_past isnotNoneor return_present: return x, present return x
目前 粮食 出现 阶段性 过剩 , 恰好 可以 以 粮食 换 森林 、 换 草地 , 再造 西部 秀美 山川 。 the present food surplus can specifically serve the purpose of helping western china restore its woodlands , grasslands , and the beauty of its landscapes .
defforward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None): # batch should be same assert q.shape[0] == k.shape[0] assert q.shape[0] == v.shape[0] # the sequence length of k and v should be aligned assert k.shape[1] == v.shape[1]
defforward(self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None): # batch should be same assert q.shape[0] == k.shape[0] assert q.shape[0] == v.shape[0] # the sequence length of k and v should be aligned assert k.shape[1] == v.shape[1]
defforward(self, x, src_mask: Optional[torch.Tensor] = None): x = self.embedding(x) x = self.pe(x) x = self.dropout(x) for layer in self.layers: x = layer(x, src_mask) return x
defforward(self, x, encoder_kv, dst_mask: Optional[torch.Tensor] = None, src_dst_mask: Optional[torch.Tensor] = None): x = self.embedding(x) x = self.pe(x) x = self.dropout(x) for layer in self.layers: x = layer(x, encoder_kv, dst_mask, src_dst_mask) return x
# y_input = y_batch with torch.no_grad(): for i inrange(1, y_input.shape[1]): y_hat = model(x_batch, y_input) for j inrange(batch_size): y_input[j, i] = torch.argmax(y_hat[j, i - 1])
tokenizer = get_tokenizer('basic_english') english = tokenizer(english)
而中文分词方面,我使用了jieba库。该库可以直接 pip 安装。
1
pip install jieba
分词的 API 是 jieba.cut。由于分词的结果中,相邻的词之间有空格,我一股脑地把所有空白符给过滤掉了。
1 2 3
import jieba chinese = list(jieba.cut(chinese)) chinese = [x for x in chinese if x notin {' ', '\t'}]
经过这些处理后,每句话被转换成了中文词语或英文单词的数组。整个处理代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
defread_file(json_path): english_sentences = [] chinese_sentences = [] tokenizer = get_tokenizer('basic_english') withopen(json_path, 'r') as fp: for line in fp: line = json.loads(line) english, chinese = line['english'], line['chinese'] # Correct mislabeled data ifnot english.isascii(): english, chinese = chinese, english # Tokenize english = tokenizer(english) chinese = list(jieba.cut(chinese)) chinese = [x for x in chinese if x notin {' ', '\t'}] english_sentences.append(english) chinese_sentences.append(chinese) return english_sentences, chinese_sentences
def__getitem__(self, index): x = np.concatenate(([SOS_ID], self.en_tensor[index], [EOS_ID])) x = torch.from_numpy(x) y = np.concatenate(([SOS_ID], self.zh_tensor[index], [EOS_ID])) y = torch.from_numpy(y) return x, y