如何做免费的网站推广,中国建筑协会证书查询,米各庄网站建设,深圳市建网站前文 并行原理简介和 DDP 并行实践 和 使用 torchrun 进行容错处理 在简单的随机数据上演示了使用 DDP 并行加速训练的方法#xff0c;本文考虑一个更加复杂的 GPT 类模型#xff0c;说明如何进行 DDP 并行实战MinGPT 是 GPT 模型的一个流行的开源 PyTorch 复现项目#xff…前文 并行原理简介和 DDP 并行实践 和 使用 torchrun 进行容错处理 在简单的随机数据上演示了使用 DDP 并行加速训练的方法本文考虑一个更加复杂的 GPT 类模型说明如何进行 DDP 并行实战MinGPT 是 GPT 模型的一个流行的开源 PyTorch 复现项目其实现简洁干净可解释因而颇具教育意义。关于 MinGPT 的详细介绍可以参考 minGPT 代码详解训练 GPT 模型执行两位数加法本文参考自Pytorch 官方教程完整代码下载wxc971231/ddp-tutorial-series 文章目录 0. 项目组织1. 参数准备2. 数据准备3. 程序入口4. 定义模型5. 定义 Trainer 0. 项目组织
本文改写 MinGPT 库中的 chargpt 例程这是一个 character-level 语言模型项目组织如下 说明一下主要文件内容 data/input.txt 是训练用的数据集char_dataset.py 定义了一个 char-level 的 torch.utils.data.Datasetgpt_snapshot.pt 是程序运行过程中保存的快照使用 torchrun 时可以从此重启所有进程的训练gpt2_train_cfg.yaml 是 yaml 配置文件记录了训练超参数main.log 是 hydra 生成的 logging 文件main.py 是程序入口符合前文 使用 torchrun 进行容错处理 第1节给出的标准形式model.py 定义了 GPT 模型结构和 optimizer 的构造方法trainer.py 定义了训练过程包括快照保存和加载等操作
1. 参数准备
本项目使用 YAML文件存储超参数设置。YAML 是一种轻量级的数据序列化格式。相较于JSON等其他格式YAML 更加易读易写也更加适合用于配置文件等场景。YAML的语法结构主要包含键值对、列表、注释等几种元素data_config:path: ./data/input.txtblock_size: 128 # 输入序列长度train_split: 0.9 # 训练集和测试集划分truncate: 0.02 # 只用5%的数据进行训练
gpt_config:n_layer: 8n_head: 8n_embd: 512
trainer_config:max_epochs: 10batch_size: 216data_loader_workers: 4grad_norm_clip: 1.0snapshot_path: gpt_snapshot.ptsave_every: 3use_amp: True
optimizer_config:weight_decay: 0.1learning_rate: 0.0003hydra:run:dir: ./使用yaml文件时可以使用 ${node.key} 的方式引用yaml中的其他变量如果超参数的值缺失可以使用 ??? 输入缺失值或使用 null 输入空值。 使用 Hydra 来管理超参数它可以以装饰器的形式方便地加载不同路径下的 yaml 配置文件最小用例如下import hydra
from omegaconf import DictConfighydra.main(version_baseNone, config_pathconfigs, config_nameconfig)
def main(cfg: DictConfig) - None:cfg[key] # 获得对应的参数值if __name__ __main__:main()这样就把 ./configs/config.yaml 文件的参数加载到 main 函数中了使用 cfg[key] 这样的形式获取参数值使用 Hydra 还有一个好处是它对 logging 标准库进行了包装。在 hydra.main 装饰器中对 log 输出格式规范为 [%(asctime)s][%(name)s][%(levelname)s] - %(message)s并设置 level 为 info运行程序就会自动生成 main.log 日志文件。可以通过命令行的hydra.verbose 参数修改 log 的显示 level
2. 数据准备
使用的数据是 tiny-shakespear 数据集它是一个记录了一些英文对话的文本文档截取如下First Citizen:
Before we proceed any further, hear me speak.All:
Speak, speak.First Citizen:
You are all resolved rather to die than to famish?All:
Resolved. resolved.First Citizen:
First, you know Caius Marcius is chief enemy to the people.All:
We knowt, we knowt.First Citizen:
Let us kill him, and well have corn at our own price.
Ist a verdict?All:
No more talking ont; let it be done: away, away!下面来构造数据集思路是把 txt 文件中所有字符去重排序生成 vocab table样本生成时先把 txt 内容全部读取进来然后构造 n-gram 样本。如下import torch
from torch.utils.data import Dataset
import fsspec
from dataclasses import dataclass
Adapted from https://github.com/karpathy/minGPT/blob/master/projects/chargpt/chargpt.py
dataclass
class DataConfig:path: str Noneblock_size: int None # 输入序列长度 train_split: float None # 训练集和测试集划分truncate: float 1.0 # 用于训练的数据占全体数据的比例class CharDataset(Dataset):def __init__(self, data_cfg: DataConfig): #data_path: str, block_size):# 加载所需比例的数据data fsspec.open(data_cfg.path).open().read().decode(utf-8)data data[ : int(len(data) * data_cfg.truncate)]# Set 去重转 list 后排序得到数据集中的唯一字符列表作为词表chars sorted(list(set(data))) data_size, vocab_size len(data), len(chars)print(Data has %d characters, %d unique. % (data_size, vocab_size))# 得到字符和词表索引之间的双射self.stoi {ch: i for i, ch in enumerate(chars)} # 字符 - 词表索引self.itos {i: ch for i, ch in enumerate(chars)} # 词表索引 - 字符self.block_size data_cfg.block_size # 模型输入序列长度self.vocab_size vocab_size # 词表尺寸self.data datadef __len__(self):return len(self.data) - self.block_sizedef __getitem__(self, idx):# grab a chunk of (block_size 1) characters from the datachunk self.data[idx:idx self.block_size 1]# encode every character to an integerdix [self.stoi[s] for s in chunk]x torch.tensor(dix[:-1], dtypetorch.long)y torch.tensor(dix[1:], dtypetorch.long)return x, y3. 程序入口
使用 torchrun 命令进行容错按前文 使用 torchrun 进行容错处理 给出的标准形式来编写程序入口mian.py如下import os
import torch
from torch.utils.data import random_split
from torch.distributed import init_process_group, destroy_process_group
from model import GPT, GPTConfig, OptimizerConfig, create_optimizer
from trainer import Trainer, TrainerConfig
from char_dataset import CharDataset, DataConfig
from omegaconf import DictConfig
import hydradef ddp_setup():os.environ[MASTER_ADDR] localhost # 由于这里是单机实验所以直接写 localhostos.environ[MASTER_PORT] 12355 # 任意空闲端口init_process_group(backendnccl)torch.cuda.set_device(int(os.environ[LOCAL_RANK]))def get_train_objs(gpt_cfg: GPTConfig, opt_cfg: OptimizerConfig, data_cfg: DataConfig):dataset CharDataset(data_cfg)train_len int(len(dataset) * data_cfg.train_split)train_set, test_set random_split(dataset, [train_len, len(dataset) - train_len])gpt_cfg.vocab_size dataset.vocab_sizegpt_cfg.block_size dataset.block_sizemodel GPT(gpt_cfg)optimizer create_optimizer(model, opt_cfg)return model, optimizer, train_set, test_sethydra.main(version_baseNone, config_path., config_namegpt2_train_cfg)
def main(cfg: DictConfig):# 初始化进程池ddp_setup()# 从 yaml 文件读取超参数gpt_cfg GPTConfig(**cfg[gpt_config])opt_cfg OptimizerConfig(**cfg[optimizer_config])data_cfg DataConfig(**cfg[data_config])trainer_cfg TrainerConfig(**cfg[trainer_config])# 创建训练对象model, optimizer, train_data, test_data get_train_objs(gpt_cfg, opt_cfg, data_cfg)trainer Trainer(trainer_cfg, model, optimizer, train_data, test_data)# 开始训练trainer.train()# 训练完成后销毁进程池destroy_process_group()if __name__ __main__:main()注意其中使用 hydra.main 装饰器来加载参数运行时使用以下命令指定 GPU 运行CUDA_VISIBLE_DEVICES1,2 torchrun --standalone --nproc_per_nodegpu main.py4. 定义模型
整个模型定义部分相比 MinGPT 原始代码逻辑上没有区别只是换了一下写法看起来更清晰一点。首先定义两个 dataclass 保存模型和优化器参数from dataclasses import dataclass
import math
import torch
import torch.nn as nn
from torch.nn import functional as Fdataclass
class GPTConfig:model_type: str gpt2# model configurationsn_layer: int Nonen_head: int Nonen_embd: int None# openais values for gpt2vocab_size: int 50257 block_size: int 1024# dropout hyperparametersembd_pdrop: float 0.1resid_pdrop: float 0.1attn_pdrop: float 0.1dataclass
class OptimizerConfig:learning_rate: float 3e-4weight_decay: float 0.1定义多头 masked self-attention 模块原本 MinGPT 库是全部手写的这里则用了 pytorch 自己的多头注意力模块。具体做法是使用 torch.nn.MultiheadAttention 定义普通多头注意力层在 forward 方法中用同一个序列输入构造 qkv 实现 self-attention再用过对注意力输出设置遮盖实现 maskclass MultiheadAttentionLayer(nn.Module):A multi-head masked self-attention layer with a projection at the end.def __init__(self, config, devicecpu, dtypetorch.float32):super().__init__()assert config.n_embd % config.n_head 0self.resid_drop nn.Dropout(config.resid_pdrop)# output projectionself.c_proj nn.Linear(config.n_embd, config.n_embd, devicedevice, dtypedtype)# Causal mask。注意这个mask是通过 self.register_buffer 方法登记的# 这样登记过的张量可以求梯度也可以随模型在 CPU/GPU 之间移动但是不进行参数优化self.register_buffer(mask, torch.tril(torch.ones(config.block_size, config.block_size)).view(1, 1, config.block_size, config.block_size))self.attn torch.nn.MultiheadAttention(embed_dimconfig.n_embd,num_headsconfig.n_head,dropoutconfig.attn_pdrop,batch_firstTrue,devicedevice,dtypedtype)def forward(self, x):_, seq_size, _ x.size() # batch size, sequence length, embedding dimensionality (n_embd)y self.attn(x, x, x, attn_maskself.mask[0, 0, :seq_size, :seq_size])[0]y self.resid_drop(self.c_proj(y))return y我感觉这里 self.attn(x, x, x, attn_maskself.mask[0, 0, :seq_size, :seq_size])[0] 的调用有问题因为 torch.nn.MultiheadAttention 的前向过程需要输入 querykey 和 value 三个 tensor这里应该把 x 用三个线性层变换后再作为输入。如果读者有其他想法可以和我讨论。考虑到本文主要说明 DDP 并行暂不关注此问题 定义 Transformer blockclass Block(nn.Module): an unassuming Transformer block def __init__(self, config: GPTConfig):super().__init__()self.ln1 nn.LayerNorm(config.n_embd)self.ln2 nn.LayerNorm(config.n_embd)self.attn MultiheadAttentionLayer(config)self.mlp nn.Sequential(nn.Linear(config.n_embd, 4 * config.n_embd),nn.GELU(),nn.Linear(4 * config.n_embd, config.n_embd),nn.Dropout(config.resid_pdrop),)def forward(self, x):x x self.attn(self.ln1(x))x x self.mlp(self.ln2(x))return x定义字符嵌入层用 nn.Embedding 嵌入 token再设置一个 nn.Parameter 作为可学习的位置编码class EmbeddingStem(nn.Module):def __init__(self, config: GPTConfig, devicecpu, dtypetorch.float32):super().__init__()self.tok_emb nn.Embedding(config.vocab_size, config.n_embd, devicedevice, dtypedtype)self.pos_emb nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, devicedevice, dtypedtype))self.drop nn.Dropout(config.embd_pdrop)self.block_size config.block_sizedef reset_parameters(self): self.tok_emb.reset_parameters() # 将 nn.Embedding 层参数初始化为正态分布采样def forward(self, idx):b, t idx.size()assert t self.block_size, fCannot forward sequence of length {t}, block size is only {self.block_size}token_embeddings self.tok_emb(idx) # each index maps to a (learnable) embedding vectorposition_embeddings self.pos_emb[:, :t, :] # each position maps to a (learnable) position vectorreturn self.drop(token_embeddings position_embeddings)把以上组件合在一起定义 GPT 模型class GPT(nn.Module): GPT Language Model def __init__(self, config: GPTConfig):super().__init__()self.block_size config.block_sizeconfig self._set_model_config(config)# input embedding stemself.emb_stem EmbeddingStem(config)# transformerself.blocks nn.Sequential(*[Block(config) for _ in range(config.n_layer)])# decoder headself.ln_f nn.LayerNorm(config.n_embd)self.head nn.Linear(config.n_embd, config.vocab_size, biasFalse)# init all weights, and apply a special scaled init to the residual projections, per GPT-2 paperself.apply(self._init_weights)for pn, p in self.named_parameters():if pn.endswith(c_proj.weight):p.data.normal_(mean0.0, std0.02/math.sqrt(2 * config.n_layer))# report number of parameters (note we dont count the decoder parameters in lm_head)n_params sum(p.numel() for p in self.blocks.parameters())print(number of parameters: %.2fM % (n_params/1e6,))def _set_model_config(self, config):type_given config.model_type is not Noneparams_given all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])# assert type_given ^ params_given # exactly one of these (XOR)if type_given and not params_given:# translate from model_type to detailed configurationconfig.__dict__.update({# names follow the huggingface naming conventions# GPT-1openai-gpt: dict(n_layer12, n_head12, n_embd768), # 117M params# GPT-2 configsgpt2: dict(n_layer12, n_head12, n_embd768), # 124M paramsgpt2-medium: dict(n_layer24, n_head16, n_embd1024), # 350M paramsgpt2-large: dict(n_layer36, n_head20, n_embd1280), # 774M paramsgpt2-xl: dict(n_layer48, n_head25, n_embd1600), # 1558M params# Gophersgopher-44m: dict(n_layer8, n_head16, n_embd512),# (there are a number more...)# I made these tiny models upgpt-mini: dict(n_layer6, n_head6, n_embd192),gpt-micro: dict(n_layer4, n_head4, n_embd128),gpt-nano: dict(n_layer3, n_head3, n_embd48),}[config.model_type])return configdef _init_weights(self, module):if isinstance(module, (nn.Linear, nn.Embedding)):module.weight.data.normal_(mean0.0, std0.02)if isinstance(module, nn.Linear) and module.bias is not None:module.bias.data.zero_()elif isinstance(module, nn.LayerNorm):module.bias.data.zero_()module.weight.data.fill_(1.0)def forward(self, idx, targetsNone):x self.emb_stem(idx)x self.blocks(x)x self.ln_f(x)logits self.head(x)# if we are given some desired targets also calculate the lossloss Noneif targets is not None:loss F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index-1)return logits, losstorch.no_grad()def generate(self, idx, max_new_tokens, temperature1.0, do_sampleFalse, top_kNone):Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and completethe sequence max_new_tokens times, feeding the predictions back into the model each time.Most likely youll want to make sure to be in model.eval() mode of operation for this.for _ in range(max_new_tokens):# if the sequence context is growing too long we must crop it at block_sizeidx_cond idx if idx.size(1) self.block_size else idx[:, -self.block_size:]# forward the model to get the logits for the index in the sequencelogits, _ self(idx_cond)# pluck the logits at the final step and scale by desired temperaturelogits logits[:, -1, :] / temperature# optionally crop the logits to only the top k optionsif top_k is not None:v, _ torch.topk(logits, top_k)logits[logits v[:, [-1]]] -float(Inf)# apply softmax to convert logits to (normalized) probabilitiesprobs F.softmax(logits, dim-1)# either sample from the distribution or take the most likely elementif do_sample:idx_next torch.multinomial(probs, num_samples1)else:_, idx_next torch.topk(probs, k1, dim-1)# append sampled index to the running sequence and continueidx torch.cat((idx, idx_next), dim1)return idx最后我们来定义优化器def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):This long function is unfortunately doing something very simple and is being very defensive:We are separating out all parameters of the model into two buckets: those that will experienceweight decay for regularization and those that wont (biases, and layernorm/embedding weights).We are then returning the PyTorch optimizer object.# separate out all parameters to those that will and wont experience regularizing weight decaydecay set()no_decay set()whitelist_weight_modules (torch.nn.Linear, )blacklist_weight_modules (torch.nn.LayerNorm, torch.nn.Embedding)for mn, m in model.named_modules():for pn, p in m.named_parameters():fpn %s.%s % (mn, pn) if mn else pn # full param name# random note: because named_modules and named_parameters are recursive# we will see the same tensors p many many times. but doing it this way# allows us to know which parent module any tensor p belongs to...if pn.endswith(bias):# all biases will not be decayedno_decay.add(fpn)elif pn.endswith(weight) and isinstance(m, whitelist_weight_modules):# weights of whitelist modules will be weight decayeddecay.add(fpn)elif pn.endswith(in_proj_weight):# MHA projection layerdecay.add(fpn)elif pn.endswith(weight) and isinstance(m, blacklist_weight_modules):# weights of blacklist modules will NOT be weight decayedno_decay.add(fpn)elif pn.endswith(pos_emb):# positional embedding shouldnt be decayedno_decay.add(fpn)# validate that we considered every parameterparam_dict {pn: p for pn, p in model.named_parameters()}inter_params decay no_decayunion_params decay | no_decayassert len(inter_params) 0, parameters %s made it into both decay/no_decay sets! % (str(inter_params), )assert len(param_dict.keys() - union_params) 0, parameters %s were not separated into either decay/no_decay set! \% (str(param_dict.keys() - union_params), )# create the pytorch optimizer objectoptim_groups [{params: [param_dict[pn] for pn in sorted(list(decay))], weight_decay: opt_config.weight_decay},{params: [param_dict[pn] for pn in sorted(list(no_decay))], weight_decay: 0.0},]optimizer torch.optim.AdamW(optim_groups, lropt_config.learning_rate, betas(0.9, 0.95))return optimizer这里主要是通过权重衰减方法来进行正则化避免过拟合。注意到作者通过一个二重遍历考察 GPT 模型所有 sub module 的所有 parameters仅对所有 torch.nn.Linear 层的 weight 参数进行衰减bias 参数及所有 torch.nn.LayerNorm、torch.nn.Embedding 模块的参数都不做处理。由于模块是递归组织的这个二重遍历会重复访问很多参数所以通过 set 自动去重最后根据处理结果定义 torch.optim.AdamW 优化器返回 关于权重衰减的理论说明参考机器学习基础6—— 使用权重衰减和丢弃法缓解过拟合问题
5. 定义 Trainer Trainer 定义和原始 MinGPT 库主要有两个区别 按指定周期要求 rank0 进程保存 snapshot本项目中应包含 epoch、模型参数和优化器参数三部分内容初始化 Trainer 时应当加载可能存在的 snapshot 文件这样在 torchrun 自动重启进程时可以从最近的 snapshot 恢复训练 可以使用 torch.cuda.amp.GradScaler 进行混合精度训练 混合精度训练Mixed Precision Training是一种训练深度学习模型的技术旨在提高模型的训练速度和效率。它利用了现代GPU可以混合计算精度的硬件特性使用FP16数据类型对模型中的某些操作进行加速。具体而言模型的参数通常使用FP32数据类型而输入数据和梯度则使用FP16数据类型从而减少内存开销加速计算速度提高模型的训练效率。此外混合精度训练还可以通过减少浮点运算和内存访问降低能源消混合精度训练的主要困难在于 fp16 的表示范围有限在训练中常出现溢出问题尤其是下溢出因为在网络训练的后期模型的梯度往往很小另外还有舍入误差问题这是指当梯度过小小于当前区间内的最小间隔时该次梯度更新可能会失效解决以上问题的方法包括损失缩放和FP32权重备份等前者对计算出的 loss 值进行缩放(scale)这样梯度也会被缩放进而平移到 FP16 的有效范围内存储在进行梯度更新之前先将缩放后的梯度转化为 FP32 再unscale回去后者将模型权重、激活值、梯度等数据用 FP16 来存储同时维护一份 FP32 的模型权重副本用于更新。在反向传播得到 FP16 的梯度以后将其转化成 FP32 并 unscale最后更新 FP32 的模型权重。因为整个更新过程是在 FP32 的环境中进行的所以不会出现舍入误差有一些代码库可以帮助我们快速实现混合精度训练而无需大幅修改代码包括 nvidia 的 apex 库和 pytorch 1.6 后引入的 amp 库等 本项目使用 pytorch 的 amp 库进行混合精度训练主要用到 GradScaler 和 autocast 两个组件。其中 Gradscalar 对会检查梯度是否发现溢出并对优化器进行控制 (将丢弃的batches转换为 no-op)autocast 是一个上下文管理器当进入 autocast 上下文后tensor 的数据类型会自动转换为半精度浮点型从而在不损失训练精度的情况下加快运算而不需要手动调用 .half()。 一个最小实践示例为 from torch.cuda.amp import autocast as autocast, GradScalerother code
# 在训练最开始之前实例化一个GradScaler对象
scaler GradScaler()other code
# 前向过程(model loss)开启 autocastwith autocast():output model(input)loss loss_fn(output, target)# Scales loss这是因为半精度的数值范围有限因此需要用它放大scaler.scale(loss).backward()# scaler.step() unscale之前放大后的梯度但是scale太多可能出现inf或NaN# 故其会判断是否出现了inf/NaN# 如果梯度的值不是 infs 或者 NaNs, 那么调用optimizer.step()来更新权重,# 如果检测到出现了inf或者NaN就跳过这次梯度更新同时动态调整scaler的大小scaler.step(optimizer)# 查看是否要更新scaler,这个要注意不能丢scaler.update()
other code下面开始分析 trainer 代码首先定义两个 dataclass 存储 Trainer 参数和 snapshot 参数 dataclass
class TrainerConfig:max_epochs: int Nonebatch_size: int Nonedata_loader_workers: int Nonegrad_norm_clip: float Nonesnapshot_path: Optional[str] Nonesave_every: int Noneuse_amp: bool Nonedataclass
class Snapshot:model_state: OrderedDict[str, torch.Tensor]optimizer_state: Dict[str, Any]finished_epoch: int定义 Trianer 的初始化方法 class Trainer:def __init__(self, trainer_config: TrainerConfig, model, optimizer, train_dataset, test_datasetNone):self.config trainer_config# set torchrun variablesself.local_rank int(os.environ[LOCAL_RANK]) # 在所有node的所有进程中当前GPU进程的rankself.global_rank int(os.environ[RANK]) # 在当前node中当前GPU进程的rank# data stuffself.train_dataset train_datasetself.train_loader self._prepare_dataloader(train_dataset)self.test_loader self._prepare_dataloader(test_dataset) if test_dataset else None# initialize train statesself.epochs_run 0self.model model.to(self.local_rank)self.optimizer optimizer self.save_every self.config.save_every# load snapshot if available. only necessary on the first node.if self.config.snapshot_path is None:self.config.snapshot_path snapshot.ptself._load_snapshot()# wrap with DDP. this step will synch model across all the processes.self.model DDP(self.model, device_ids[self.local_rank])# torch.cuda.amp.GradScaler 是一个用于自动混合精度训练的 PyTorch 工具它可以帮助加速模型训练并减少显存使用量# 具体来说GradScaler 可以将梯度缩放到较小的范围以避免数值下溢或溢出的问题同时保持足够的精度以避免模型的性能下降if self.config.use_amp: self.scaler torch.cuda.amp.GradScaler()注意几点 torchrun 帮助我们自动分发进程通过环境变量获取当前运行代码的 GPU rank 信息初始化 Trainer 时加载可能存在的 snapshot实现断点续训模型使用 DDP 进行包装定义混合精度训练所需的 torch.cuda.amp.GradScaler() 定义 DataLoder注意使用 DistributedSampler 来分发训练数据 def _prepare_dataloader(self, dataset: Dataset):return DataLoader(dataset,batch_sizeself.config.batch_size,pin_memoryTrue,shuffleFalse,num_workersself.config.data_loader_workers,samplerDistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU它能避免数据重叠)定义 snapshot 的加载和保存方法 def _save_snapshot(self, epoch):# capture snapshotmodel self.modelraw_model model.module if hasattr(model, module) else modelsnapshot Snapshot(model_stateraw_model.state_dict(),optimizer_stateself.optimizer.state_dict(),finished_epochepoch)# save snapshotsnapshot asdict(snapshot)torch.save(snapshot, self.config.snapshot_path)print(fSnapshot saved at epoch {epoch})def _load_snapshot(self):try:snapshot fsspec.open(self.config.snapshot_path) # fsspec 为各种后端存储系统提供统一的 Python 接口可以用相同的语法打开本地、AWS S3 和 GCS 等各种云存储平台的文件with snapshot as f:snapshot_data torch.load(f, map_locationcpu)except FileNotFoundError:print(Snapshot not found. Training model from scratch)return snapshot Snapshot(**snapshot_data)self.model.load_state_dict(snapshot.model_state)self.optimizer.load_state_dict(snapshot.optimizer_state)self.epochs_run snapshot.finished_epochprint(fResuming training from snapshot at Epoch {self.epochs_run})定义训练流程 def _run_batch(self, source, targets, train: bool True) - float:with torch.set_grad_enabled(train), torch.cuda.amp.autocast(dtypetorch.float16, enabled(self.config.use_amp)):_, loss self.model(source, targets)if train:self.optimizer.zero_grad(set_to_noneTrue)if self.config.use_amp: self.scaler.scale(loss).backward()torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)self.scaler.step(self.optimizer)self.scaler.update()else:loss.backward()torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.config.grad_norm_clip)self.optimizer.step()#return loss.item()return lossdef _run_epoch(self, epoch: int, dataloader: DataLoader, train: bool True):dataloader.sampler.set_epoch(epoch)for iter, (source, targets) in enumerate(dataloader):step_type Train if train else Evalsource source.to(self.local_rank)targets targets.to(self.local_rank)batch_loss self._run_batch(source, targets, train)if iter % 100 0:#print(f[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f})if train:print(f[GPU{self.global_rank}] Epoch {epoch} | Iter {iter} | {step_type} Loss {batch_loss.item():.5f})else:eval_loss_list [torch.zeros_like(batch_loss) for _ in range(int(os.environ[WORLD_SIZE]))]dist.gather(batch_loss,eval_loss_list if self.local_rank 0 else None, dst0)if self.local_rank 0:for i, loss in enumerate(eval_loss_list):print(f[GPU{i}] Epoch {epoch} | Iter {iter} | {step_type} Loss {loss.item():.5f})def train(self):for epoch in range(self.epochs_run, self.config.max_epochs):epoch 1# train for one epochself._run_epoch(epoch, self.train_loader, trainTrue)# 各个 GPU 上都在跑一样的训练进程这里指定 rank0 进程保存 snapshot 以免重复保存if self.local_rank 0 and epoch % self.save_every 0:self._save_snapshot(epoch)# eval runif self.test_loader:self._run_epoch(epoch, self.test_loader, trainFalse)这里需要注意几点 指定 rank0 进程保存 snapshot 以免重复保存_run_batch 方法中计算 loss 的部分设置在 torch.amp.autocast 上下文中启动混合精度训练_run_epoch 方法中使用 torch.distributed.gather 原语汇聚各个 GPU 的验证损失信息到 rank0 上常用这种操作进行 log 训练信息。除此以外 Pytorch 一共提供了六个进程通信原语如下import torch.distributed as distdist.broadcast(tensor, src, group) # 将 tensor 从 src 复制到所有其他进程。
dist.reduce(tensor, dst, op, group) # 将 op 应用于每个 tensor 并将结果存储在 dst 中。
dist.all_reduce(tensor, op, group) # 与 reduce 相同但结果存储在所有进程中。
dist.scatter(tensor, scatter_list, src, group) # 复制 tensor scatter_lost[i] 到 进程
dist.gather(tensor,gather_list, dst, group) # 从 dst 中的所有进程复制 tensor。
dist.all_gather(tensor_list, tensor, group) # 将所有进程的 tensor 复制到所有进程上的 tensor_list。
dist.barrier(group) # 阻塞组中的所有进程直到每个进程都进入该函数。其中 op 操作有四种dist.ReduceOp.SUM,
dist.ReduceOp.PRODUCT,
dist.ReduceOp.MAX,
dist.ReduceOp.MIN.这些方法在需要手动汇聚或分发信息时特别有用具体用法可以参考 pytorch 官方文档