文章目录

  • 总览
    • 1. 必知概念
  • 代码示例
    • 1. DP(torch.nn.DataParallel)
    • 2. DDP(torch.nn.parallel.DistributedDataParallel)
      • 示例1
      • 示例2
        • 2.1 环境配置
        • 2.2 数据集 与 加载器构造
        • 2.3 模型
        • 2.4 训练

总览

  • pytorch提供了两种方式实现并行训练:

单机多GPU可以使用 torch.nn.DataParallel接口(DP,旧的) 或者 torch.nn.parallel.DistributedDataParallel接口(DDP,新的),官方推荐使用第二个,多机多卡的情况下只能使用DDP。

DistributedDataParallel 和 DataParallel 之间的区别是:

  • DistributedDataParallel使用多进程multiprocessing,即为每个GPU创建一个进程,而DataParallel使用的是多线程。DDP通过使用multiprocessing,每个GPU都有专门的进程,这就避免了python解释器的GIL导致的性能开销。如果使用DDP,可以使用torch.distributed.launch来启动程序。
  • DDP的上层调用是通过dispatch.py实现的,即dispatch.py是DDP的python入口,它实现了调用C++库forward的nn.parallel.DistributedDataParallel模块的初始化和功能。
  • Pytorch 的分布式训练主要是使用torch.distributed来实现的,它主要由三个组件构成:

    1. Distributed Data-Parallel Training(DDP):它是一个single-program和multi-process。使用DDP组件的时候,模型被复制到每一个进程也就是GPU里面,每个model都会被送入同样大小batch_size的不同样本进行训练,每个model都会计算出一个grad,然后每个model计算好的grad和其他的GPU进行通信然后进行同步更新model参数,以此来加快模型训练速度。
    2. RPC-Based Distributed Training(RPC):支持一些无法并行化训练数据的范式,例如分布式管道范式、参数服务器范式(参数和训练器不在同一个服务器上)、结合DDP的其他训练范式。
    3. Collection Communication(c10d):支持在一个组里面跨进程的传送张量,它提供了collective通信APIs和P2P通信APIs,DDP模式和RPC模式就是建立在c10d的基础上,DDP采用的是collective communication,RPC采用P2P communication, 一般情况下很少使用这个API,因为DDP和RPC已经足以在很多场景下使用。
      计算 distributed parameter averaging,参数平均的时候可能会用到这个API.

1. 必知概念

  • 分布式、并行

    • 分布式是指多台服务器的多块GPU(多机多卡);
    • 而并行一般指的是一台服务器的多个GPU(单机多卡)。
  • 模型并行、数据并行

    • 当模型很大,单张卡放不下时,需要将模型分成多个部分分别放到不同的卡上,每张卡输入的数据相同,这种方式叫做模型并行;
    • 而将不同的数据分配到不同的卡上,运行相同的模型,最后收集所有卡的运算结果来提高训练速度的方式叫数据并行。相比于模型并行,数据并行更为常用,以下我们主要讲述关于数据并行的内容。
  • 同步更新、异步更新

    • 同步更新指所有的GPU都计算完梯度后,累加到一起求均值进行参数更新,再进行下一轮的计算;
    • 而异步更新指单个GPU计算完梯度后,无需等待其他更新,立即更新参数并同步。同步更新速度取决于最慢的那个GPU,异步更新没有等待,但是会出现loss异常抖动等问题,一般常用的是同步更新。
  • group、world size、node、rank、local_rank

    • group指的是进程组,默认情况下只有一个主进程就只有一个组,即一个 world,当使用多进程时,一个 group 就有了多个 world;
    • world size表示全局进程个数;
    • node表示物理机器数量;
    • rank表示进程序号;
    • local_rank指进程内 GPU 编号。

    举个例子,三台机器,每台机器四张卡全部用上,那么有group=1,world size=12

    • 机器一:node=0 rank=0,1,2,3 local_rank=0,1,2,3 这里的node=0,rank=0的就是master
    • 机器二:node=1 rank=4,5,6,7 local_rank=0,1,2,3
    • 机器三:node=2 rank=8,9,10,11 local_rank=0,1,2,3

代码示例

1. DP(torch.nn.DataParallel)

DP(DataParallel)模式是很早就出现的、单机多卡的、参数服务器架构的多卡训练模式。其只有一个进程,多个线程(受到GIL限制)。master节点相当于参数服务器,其向其他卡广播其参数;在梯度反向传播后,各卡将梯度集中到master节点,master节点收集各个卡的参数进行平均后更新参数,再将参数统一发送到其他卡上,参与训练的 GPU 参数device_ids=gpus;用于汇总梯度的 GPU 参数output_device=gpus[0]。DP的使用比较简单,需要修改的代码量也很少,在Pytorch中的用法如下:

from torch.nn import DataParalleldevice = torch.device("cuda")
gpus = [0,1,2]
model = MyModel()
model = model.to(device)
model = DataParallel(model, device_ids=gpus, output_device=gpus[0])

device_ids中的第一个GPU(即device_ids[0])和model.cuda()或torch.cuda.set_device()中的第一个GPU序号应保持一致,否则会报错。此外如果两者的第一个GPU序号都不是0

model=torch.nn.DataParallel(model,device_ids=[2,3])
model.cuda(2)

那么程序可以在GPU2和GPU3上正常运行,但是还会占用GPU0的一部分显存(大约500M左右),这是由于pytorch本身的bug导致的。

使用的时候直接指定CUDA_VISIBLE_DEVICES,通过调整可见显卡的顺序指定加载模型对应的GPU,不要使用torch.cuda.set_device(),不要给.cuda()赋值,不要给torch.nn.DataParallel中device_ids赋值。比如想在GPU1,2,3中运行,其中GPU2是存放模型的显卡,那么直接设置

CUDA_VISIBLE_DEVICES=2,1,3

Pytorch官方的github提供了examples仓库:examples可以有很多例子进行学习

2. DDP(torch.nn.parallel.DistributedDataParallel)

此部分转自:知乎, blog
有疑问的地方,参考 官方文档

先简单介绍一下并行训练的大致过程:

pytorch利用torch.distributed进行分布式训练,distributed会在内部开辟多个进程,进程数与可用的GPU数一致,多个进程分别加载数据集的一部分,在每个GPU上实现加载部分数据集的前向与反向传播,多个GPU上的反向传播得到的梯度会通过gpu间的all_reduce实现平均,再在每个gpu上进行模型的参数更新,这样保证了不同GPU之间的模型参数一致,同时实现了更大batch_size的训练。

示例1

pytorch官网建议使用DistributedDataParallel来代替DataParallel, 据说是因为DistributedDataParallel比DataParallel运行的更快, 然后显存分屏的更加均衡. 而且DistributedDataParallel功能更加强悍, 例如分布式的模型(一个模型太大, 以至于无法放到一个GPU上运行, 需要分开到多个GPU上面执行). 只有DistributedDataParallel支持分布式的模型像单机模型那样可以进行多机多卡的运算.

分布式训练与单机多卡的区别:

[1] - DataLoader部分需要使用Sampler,保证不同GPU卡处理独立的子集.

[2] - 模型部分使用DistributedDataParallel.

from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallelRANK = int(os.environ['SLURM_PROCID'])  # 进程序号,用于进程间通信
LOCAL_RANK = int(os.environ['SLURM_LOCALID']) # 本地设备序号,用于设备分配.
GPU_NUM = int(os.environ['SLURM_NTASKS'])     # 使用的 GPU 总数.
IP = os.environ['SLURM_STEP_NODELIST'] #进程节点 IP 信息.
BATCH_SIZE = 16  # 单张 GPU 的大小.def dist_init(host_addr, rank, local_rank, world_size, port=23456):host_addr_full = 'tcp://' + host_addr + ':' + str(port)torch.distributed.init_process_group("nccl", init_method=host_addr_full,rank=rank, world_size=world_size)torch.cuda.set_device(local_rank)assert torch.distributed.is_initialized()if __name__ == '__main__':dist_init(IP, RANK, LOCAL_RANK, GPU_NUM)# DataSetdatasampler = DistributedSampler(dataset, num_replicas=GPU_NUM, rank=RANK)dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, sampler=datasampler)# model model = DistributedDataPrallel(model, device_ids=[LOCAL_RANK], output_device=LOCAL_RANK)

示例2

2.1 环境配置

import torch.distributed as dist
from torch.nn.parallel import DistributedDataParalleldef dist_setup(rank, world_size):os.environ['MASTER_ADDR'] = 'localhost'os.environ['MASTER_PORT'] = '12345'# initialize the process groupdist.init_process_group(backend='nccl', rank=rank, world_size=world_size)def dist_cleanup():dist.destroy_process_group()

首先要在环境变量中设置master ipport,便于进程或多机间的通信,由于本次是单机,故MASTER_ADDR写成localhost即可,如果是多机,则配置成主节点机器的ip

另外分布式环境需要dist.init_process_group()来启动,介绍一下其中主要的参数:

  • backend表示进程或节点间的通信方式,gpu训练用nccl比较块;
  • world_size表示启用的进程数量与可用的GPU数量一致,rank表示进程编号rank这个参数是由进程控制的,不用显性设置,后面可以看到

2.2 数据集 与 加载器构造

数据集datasets按正常数据集构造,如下:

from torch.utils.data import Datasetclass Datasets(Dataset):def __init__(self, data_list):self.data = data_listdef __len__(self):return len(self.data)def __getitem__(self, index):return self.data[index]

在构造加载器dataloader的时候,需要用到DistributedSampler:

sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=2,rank=dist.get_rank(), shuffle=True,drop_last=True)
loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,sampler=sampler, shuffle=False, collate_fn=None)
  • 在构造sampler时,num_replicas表示数据要分成几个部分,这与world_size的值一致,表示每个进程上分数据集的一部分

  • rank是进程编号,这里需要让每个进程自己获取该进程的编号,并根据编号来获取该进程需要负责的部分数据;

  • 在sampler中设置shuffle为True时,Dataloader中shuffle就应关掉

  • 最后,这里的batch_size是指每个进程的batch大小,即每块GPU的batch大小,所以实际的batch_size = num_gpu * batch_size

2.3 模型

模型的写法不用变,原来怎么写现在就怎么写,这里只写了简单模型做展示

import torch.nn as nnclass Model(nn.Module):def __init__(self):super(ToyModel, self).__init__()self.net1 = nn.Linear(10, 10)self.relu = nn.ReLU()self.net2 = nn.Linear(10, 5)def forward(self, x):return self.net2(self.relu(self.net1(x)))

2.4 训练

前面配置好后,到了重点的训练部分,整体上还是原来训练步骤的写法,中间有一些细节地方需要调整。

# 首先构建整体的训练逻辑框架
def main(rank, world_size, *args):# rank,world_size必须作为参数传入,其他需要传入的参数可以放后面print(f"Running basic DDP example on rank {rank}.")# 启动分布式训练环境dist_setup(rank, world_size)# 设置随机种子,非必要# set_seed(config.seed)# 加载数据集datasets = Datasets(data_list)# 构造sampler和dataloadernum_tasks = dist.get_world_size()  # 获取进程数sampler = torch.utils.data.DistributedSampler(Datasets, num_replicas=num_tasks, rank=dist.get_rank(), shuffle=True,drop_last=True)loader = DataLoader(Datasets, batch_size=8, num_workers=4, pin_memory=True,sampler=sampler, shuffle=False, collate_fn=None)# 构造模型和优化器model = Model()optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-4)# 如果继续训练,加载保存的模型参数与优化器参数if init_checkpoint:checkpoint = torch.load(init_checkpoint, map_location='cpu')state_dict = checkpoint['model']model.load_state_dict(state_dict)model = model.to(rank)  # 由于优化器的device和模型的device一致,所以这里需要将模型转到GPU上optimizer.load_state_dict(checkpoint['optimizer'])model = model.to(rank)# 需要用DistributedDataParallel将model包装,实现分布式通信,即梯度平均# find_unused_parameters最好设成True,避免模型中有些不参与梯度回传的参数影响平均梯度的计算与回传model = DistributedDataParallel(model,device_ids=[rank], find_unused_parameters=True)for epoch in range(max_epoch):train_one_epoch(rank, model, dataloader, optimizer, epoch)# 保存节点,只在进程0上保存节点,所以设置rank==0if rank == 0:save_obj = {'model': model_without_ddp.state_dict(),'optimizer': optimizer.state_dict()}torch.save(save_obj, '/your/checkpoint/path')# 等待所有进程一轮训练结束,类似于joindist.barrier()# 训练结束后关闭分布式环境dist_cleanup()

这样主体的训练框架已构建完成,下面只剩train_one_epoch,里面也有一些细节需要注意。

def train_one_epoch(rank, model, dataloader, optimizer, epoch):model.train()# ddp_loss是为了收集不同进程返回的loss,# 以便我们记录并展示所有进程的平均loss,来看loss的下降趋势ddp_loss = torch.zeros(1).to(rank)# 每次epoch前调用sampler.set_epoch,会产生不同的随机采样dataloader.sampler.set_epoch(epoch)for i, batch in enumerate(dataloader):optimizer.zero_grad()logits = model(batch)# 伪代码loss = loss_func(logits, targets)loss.backward()optimizer.step()ddp_loss[0] += loss.item()if (i+1) % 10 == 0:# 用all_reduce收集所有进程上的某个参数的值,op表示收集操作,这里使用SUM来求所有loss的和dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)size = dist.get_world_size()batch_loss = ddp_loss[0].item() / (10 * size)  # 求平均if rank == 0:  # 只在进程0上打印print(f'*** loss: {batch_loss} ***')ddp_loss = torch.zeros(1).to(rank)

最后还有一步,启用多进程运行。pytorch distributed提供了两种多进程启用方法。

  • 一种是torch.multioricessing.spawn,
  • 另一种是torch.distributed.launch。

后面一种是通过命令行启动,这里没有深入研究,下面只介绍前一种的方法,前一种仍然是代码的形式,如下:

if __name__ == "__main__":n_gpus = torch.cuda.device_count() # 本机可用的GPU数量assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"# 设置可用GPU数量os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' # 按照PCI_BUS_ID顺序从0开始排列GPU设备 os.environ['CUDA_VISIBLE_DEVICES'] = '0,1' #设置当前的GPU设备为0,1号两个设备,名称依次为’/gpu:0','/gpu:1'。表示优先使用0号设备,然后使用1号设备。world_size = 2  # 进程数,要与cuda_visible_devices的数量一致# 不知道为啥没传rank,官方就是这样写的torch.multiprocessing.spawn(main, args=(world_size,), nprocs=world_size, join=True)

到此完成初步的pytorch单机多卡 数据并行训练,目前有些地方仍然不清楚具体逻辑,后面遇到问题时会继续深入探索。
模型并行 和 多机多卡分布式训练可阅读官方文档。

补充阅读:
https://www.it610.com/article/1490876534595018752.htm

pytorch快速上手(9)-----多GPU数据并行训练方法相关推荐

  1. 深度神经网络DNN的多GPU数据并行框架 及其在语音识别的应用

    http://www.csdn.net/article/2014-07-11/2820628-DNN 深度神经网络(Deep Neural Networks, 简称DNN)是近年来机器学习领域中的研究 ...

  2. 【Pytorch神经网络理论篇】 02 Pytorch快速上手(二)GPU与CPU张量切换+具有随机值的张量+张量的数学运算

    1 在GPU与CPU上定义张量 1.1 GPU与CPU的张量相互转化 import torch # 创建一个张量 a = torch.FloatTensor() # 将CPU上的张量在GPU所管理的内 ...

  3. 【Pytorch神经网络理论篇】 03 Pytorch快速上手(三)张量的数据操作

    1 张量的数据操作 1.1 torch.reshape()实现数据维度变化 import torch a = torch.tensor([[1,2],[3,4]]) print(torch.resha ...

  4. 【Pytorch神经网络理论篇】 01 Pytorch快速上手(一)概述+张量

    1 概述 Pytorch是基于Torch之上的python包,在底层主要通过张量的形式进行计算,Pytorch中的张量表示为同一数据类型的多位橘子. 1.1 基础数据类型的概述 1.标量:即具体的数字 ...

  5. pytorch快速上手-使用自动标注软件Openlabeling和yolov5快速完成目标检测

    安装 自行github下载: openlabeling yolov5 自动标注软件openlabeling 实际上就是标注软件里面,给你内嵌一个追踪算法,可以是光流的,也可以是边缘检测的 安装的时候记 ...

  6. pytorch快速上手(10)-----netron查看神经网络结构图

    部分参考自:https://zhuanlan.zhihu.com/p/431445882 文章目录 netron介绍 1. pytorch导出onnx格式模型文件 2. netron可视化 (1)ne ...

  7. python os.environ gpu_Tensorflow下如何实现多GPU数据并行训练?

    方法1 在 Python 代码中,采用以下指令设置需要的 GPU 型号,比如我用 0 号和 1 号 GPU 训练模型(前提是你已经安装了 CUDA 和 CUDNN): os.environ['CUDA ...

  8. PyTorch 分布式训练DDP 单机多卡快速上手

    PyTorch 分布式训练DDP 单机多卡快速上手 本文旨在帮助新人快速上手最有效的 PyTorch 单机多卡训练,对于 PyTorch 分布式训练的理论介绍.多方案对比,本文不做详细介绍,有兴趣的读 ...

  9. PyTorch Data Parrallel数据并行

    PyTorch Data Parrallel数据并行 • 可选择:数据并行处理 • 本文将学习如何用 DataParallel 来使用多 GPU. 通过 PyTorch 使用多个 GPU 非常简单.可 ...

最新文章

  1. JavaScript奇技淫巧44招(2)
  2. 初识遗传算法 蚁群算法
  3. mysql最高权限超级用户是_MySQL中,预设的、拥有最高权限超级用户的用户名为( )...
  4. jsr303 自定义消息_JSR 303从I18N属性文件加载消息
  5. xml配置文件推荐方式
  6. 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解
  7. 怎样格式化电脑_硬盘数据销毁最安全的步骤是怎样的?有公司可以做吗
  8. html骨架标签 0907
  9. 注解形式控制器 数据验证,类型转换(3)
  10. sysbench压力工具报错:
  11. c语言程序设计课用电脑吗,C语言程序设计之简单计算器
  12. C++随记总结(1)----关于C++中的大小端、位段(惑位域)和内存对齐
  13. HDU2050 折线分割平面【组合】
  14. windows下编辑的shell复制到linux无法执行
  15. flask html 得到文本框 input的内容_你需要知道的HTML知识
  16. 第一篇 微信开发 准备工作(转载自walkingmanc的专栏)【转】
  17. 安卓开发--运行你的APP
  18. libed2k源码导读:(三)网络IO
  19. (离散)证明:单射 满射 同态 同构 的证明框架
  20. Linux下静态库(.a)和动态库(.so) 的生成与使用

热门文章

  1. 972信息检索 | 第四章 国内重要的综合性信息检索系统
  2. 欧尼酱讲JVM(06)——指点江山—程序计数器
  3. 数据可视化 为业务提供决策建议
  4. android 属性动画失效,日常爬坑-Android Transitions动画失效
  5. 新手坐高铁怎么找车厢_动车怎么找车厢和位置 动车的座位号是如何分布的
  6. 内容与标准为王:下一代互联网与下一代搜索
  7. SSH2框架实现注册发短信验证码实例
  8. 关于WirelessKey的一些说明
  9. 录音转文字怎么弄?分享三个快速录音转文字的方法
  10. 鼠标-键盘复用神器-->Synergy