文章目录

  • 简要概览
    • DistributedDataParallel与分布式RPC框架联合使用
    • 参数解析
      • torch.nn.parallel.DistributedDataParallel
      • join函数解析
      • no_sync函数解析
  • 源码解析
  • 实例
  • 参考

简要概览

  pytorch官方提供的分布式数据并行类为:

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)

  与torch.nn.DataParallel一样,torch.nn.parallel.DistributedDataParallel都是基于torch.distributed的。将数据依据batch_size划分,然后分布到不同的GPU上运行,之后再汇总。反向传播过程中,每个节点的梯度都被平均了。

  实例化这个类之前要求torch.distributed已经被初始化了,通过调用torch.distributed.init_process_group()实现。

  为了在一台主机上使用NGPU的话,需要生成N个进程,确保每个进程只在000~N−1N-1N−1的单个GPU上运行,这可以通过为每个进程设置CUDA_VISIBLE_DEVICES或者调用以下代码来实现:

>>> torch.cuda.set_device(i)

  其中i从000到N−1N-1N−1。在每个进程中参考以下设置来构建模块:

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

  为了在每个节点上生成多个进程,可以采用torch.distributed.launch或者torch.multiprocessing.spawn的方式。

注意:nccl后端是目前使用GPU时速度最快、强烈推荐的后端。这适用于单节点和多节点分布式训练。

注意:这个模块还支持混合精度分布式训练。这意味着你的模型可以有不同类型的参数,比如fp16fp32的混合类型,对这些混合类型的参数进行梯度还原就可以了。

注意:如果您在一个进程上使用torch.save来checkpoint模块,而在其他进程上使用 torch.load来恢复模块,请确保每个进程都正确配置了map_location。如果没有map_locationtorch.load会将模块恢复到模块保存的设备上。

注意:当一个模型在batch_size=NM个节点上进行训练时,与在batch_size=M*N的单个节点上训练的同一个模型相比,如果损失是在一个batch中的各个实例之间进行加总(而不是像往常一样进行平均),那么梯度将小M倍(因为不同节点之间的梯度是平均的)。当你想获得一个与本地训练对应的数学上等价的训练过程时,你应该考虑到这一点。但在大多数情况下,你可以只把一个分布式数据并行封装模型、一个数据并行封装模型和一个单GPU上的普通模型视为相同的模型(例如在同等批次大小的情况下使用相同的学习率)。

注意:参数是不会在进程间广播的。模块在梯度上执行all-reduce step(全还原步骤),并假设它们将在所有进程中被优化器以同样的方式修改。Buffers(如BatchNorm统计)在每次迭代时都会从rank 0的进程中的模块广播到系统中的所有其他副本。

DistributedDataParallel与分布式RPC框架联合使用

  如果您将DistributedDataParallel与分布式 RPC 框架结合使用,您应该始终使用torch.distributed.autograd.backward来计算梯度,并使用torch.distributed.optim.DistributedOptimizer来优化参数。

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # Setup optimizer
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
>>>     dist_optim.step()

警告:构造函数、前向方法、输出(或本模块输出的函数)的分化是分布式的同步点。要考虑到这一点,以防不同进程可能执行不同的代码。

警告:module假设在创建模型时,所有参数都已注册。也就是说之后不应添加或删除任何参数。缓冲区也是如此。

警告:module假设所有分布式进程的module参数在模型中的注册顺序是相同的。module将按照模型中注册参数的相反顺序进行梯度还原。换句话说,用户有责任保证每个分布式进程的模型完全相同,从而保证参数注册顺序完全相同。

警告:module允许参数具有non-rowmajor-contiguous strides(非行主连续步长)。例如,您的模型可能包含一些参数,其torch.memory_formattorch.contiguous_format,而其他参数的格式是torch.channels_last。但是,不同进程中的相应参数必须具有相同的跨度。

警告:这个模块不能和torch.autograd.grad()一起使用(也就是说,只有在参数的.grad属性中要累积grad的时候,它才会工作)。

警告:如果你打算将这个模块与nccl后端或gloo后端(that uses Infiniband),以及使用多个workerDataLoader一起使用,请将多处理启动方法改为forkerver(仅Python 3)或spwn。不幸的是,Gloo(that uses Infiniband)和NCCL2不是fork安全的,如果你不改变这个设置,你很可能会遇到死锁。

警告:在用DistributedDataParallel包装好模型后,你千万不要试图改变模型的参数。因为,当用DistributedDataParallel封装你的模型时, DistributedDataParallel的构造函数会在构造时对模型本身的所有参数注册额外的梯度还原函数。如果之后你改变了模型的参数,梯度重函数不再匹配正确的参数集。

警告:gradient_as_bucket_view模式还不能与自动混合精度(Automatic Mixed Precision AMP)一起使用。AMP会维护用于取消缩放梯度的隐蔽梯度,当gradient_as_bucket_view=True时,这些隐蔽的梯度将指向通信设备。当gradient_as_bucket_view=True时,在第一次迭代中,这些隐藏的梯度将指向通信桶。在下一次迭代中,通信桶会发生变化,因此这些隐藏的梯度也会意外地发生变化,这可能会导致错误的结果。

参数解析

torch.nn.parallel.DistributedDataParallel

torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False)
  1. module:需要被并行的module。
  2. device_ids:输入参数为一个int类型的list。当单个CUDA设备的时候需要提供这个参数,对于单个设备module,第i个module被复制到device_ids[i]上。对于多个设备的module,或者是CPU的module的时候,device_ids需要为None或者为空list,前向传播输入的数据需要放置到正确的设备上。(对于单设备modules默认为所有可用的devices)。
  3. output_device:单设备模块的输出设备位置,对于多设备modules或者是CPU的modules来说,这个参数需要为None,module本身决定输出位置。(对于单设备来说,默认为device_ids[0])
  4. broadcast_buffers:布尔类型的变量,前向传播时,使得module的buffers同步的控制变量,默认为True。
  5. process_group:用于分布式数据all-reduction的进程组,如果为None,默认的进程组由torch.distributed.init_process_group创建。
  6. bucket_cap_mb:DistributedDataParallel会将bucket 参数放置到多个bucket中,这样梯度在反向计算的过程中就能够覆盖。bucket_cap_mb控制bucket的大小,默认为25。
  7. find_unused_parameters:布尔类型的变量。遍历所有tensor中的 autograd graph,包含了被封装的前向传播的返回值。graph中没有收到梯度的参数会被预先标记为可以被还原。前向传播之后的输出都必须计算loss和梯度,如果没有,这个封装函数会等待autograd产生梯度参数。Any outputs derived from module parameters that are otherwise unused can be detached from the autograd graph using torch.Tensor.detach. (default: False)
  8. check_reduction:这个参数已经被废弃。
  9. gradient_as_bucket_view:布尔类型的变量。This is a prototype feature and subject to changes. When set to True, gradients will be views pointing to different offsets of allreduce communication buckets. This can reduce peak memory usage, where the saved memory size will be equal to the total gradients size. Moreover, it avoids the overhead of copying between gradients and allreduce communication buckets. When gradients are views, detach_() cannot be called on the gradients. If hitting such errors, please fix it by referring to the zero_grad() function in torch/optim/optimizer.py as a solution.

  主要的使用方法为:

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.DistributedDataParallel(model, pg)

join函数解析

  • join(divide_by_initial_world_size=True, enable=True)

  一个与 torch.nn.parallel.DistributedDataParallel 实例结合使用的context manager,以便能够在参与进程间进行不均匀输入的训练。

  这个context manager将跟踪已经加入的 DDP 进程,并通过插入collective communication操作与非加入的 DDP 进程创建的操作相匹配来 "shadow "前向和后向传递。这将确保每个集体调用都有一个已经加入的DDP进程的对应调用,防止在各进程输入不均的情况下进行训练时发生挂起或错误。

  一旦所有DDP进程加入,context manager将向所有进程广播与最后加入的进程相对应的模型,以确保所有进程的模型是相同的(这是DDP保证的)。

  要使用这一点来实现各进程间输入不均匀的训练,只需将此context manager封装在训练循环中即可。无需进一步修改模型或数据加载。

警告:此module只在多进程单设备上使用,意味着一个单个进程在一个GPU上运行。

警告:This module currently does not support custom distributed collective operations in the forward pass, such as SyncBatchNorm or other custom defined collectives in the model’s forward pass.

  此函数有两个参数:divide_by_initial_world_sizeenable

  1. divide_by_initial_world_size

  如果为True,将用DDP训练launched时,会将初始world_size除以梯度。如果为False,将在all reduce过程中计算有效世界大小(number of ranks that have not depleted their inputs yet),并将梯度除以该大小。设置 divide_by_initial_world_size=True,以确保每个输入样本(包括不均匀的输入)在对全局梯度的贡献度上具有同等权重。这是通过将梯度除以初始world_size来实现的,即使我们遇到不均匀的输入。如果你把这个设置为False,我们会把梯度除以剩余的节点数。这确保了在较小的world_size上进行训练的平等性,尽管这也意味着不均匀的输入会对全局梯度做出更大的贡献。通常情况下,当训练作业的最后几个输入是不均匀的时候,你会希望将此设置为True。在极端的情况下,如果输入的数量有很大的差异,将此设置为False可能会得到更好的结果。

  1. enable

  是否启用不均匀输入检测。默认为True

>>>  import torch
>>>  import torch.distributed as dist
>>>  import os
>>>  import torch.multiprocessing as mp
>>>  import torch.nn as nn
>>>  # On each spawned worker
>>>  def worker(rank):
>>>      dist.init_process_group("nccl", rank=rank, world_size=2)
>>>      torch.cuda.set_device(rank)
>>>      model = nn.Linear(1, 1, bias=False).to(rank)
>>>      model = torch.nn.parallel.DistributedDataParallel(
>>>          model, device_ids=[rank], output_device=rank
>>>      )
>>>      # Rank 1 gets one more input than rank 0.
>>>      inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>      with model.join():
>>>          for _ in range(5):
>>>              for inp in inputs:
>>>                  loss = model(inp).sum()
>>>                  loss.backward()
>>>  # Without the join() API, the below synchronization will hang
>>>  # blocking for rank 1's allreduce to complete.
>>>  torch.cuda.synchronize(device=rank)

no_sync函数解析

  一个context manager,用于禁用跨DDP进程的gradient synchronizations(梯度同步)。在此context中,梯度将在module变量上累积,随后在退出context的第一个前向-后向通道中进行同步。

>>> ddp = torch.nn.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>   for input in inputs:
>>>     ddp(input).backward()  # no synchronization, accumulate grads
>>> ddp(another_input).backward()  # synchronize grads

看到这我哭了啊,这跟我想要的东西不太一样,哎,没动力看下去了,以后再说这个吧。

源码解析

实例

  • GETTING STARTED WITH DISTRIBUTED DATA PARALLEL
  • 【分布式训练】单机多卡的正确打开方式(三):PyTorch

  pytorch官方在github上提供了examples仓库,包含了各种深度学习任务的模型和相关示例代码,这里 我们以Pytorch官方仓库里的ResNet50的分布式训练源码为例,简单讲解下pytorch分布式训练相关方法和参数。

  分布式训练的第一步是需要设置分布式进程组,设置多机通信后端、本机ip端口号、节点总数、本机编号等信息。 (源码129行):

dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size, rank=args.rank)

将上述分布式相关参数,传递到torch.distributed.init_process_group并初始化用于训练的进程组; 初始化进程组之前,我们首先看下main.py的相关参数设置: 相关参数 源码第59行:

parser.add_argument('--world-size', default=-1, type=int,help='number of nodes for distributed training')
parser.add_argument('--rank', default=-1, type=int,help='node rank for distributed training')
parser.add_argument('--dist-url', default='tcp://224.66.41.62:23456', type=str,help='url used to set up distributed training')
parser.add_argument('--dist-backend', default='nccl', type=str,help='distributed backend')
parser.add_argument('--seed', default=None, type=int,help='seed for initializing training. ')
parser.add_argument('--gpu', default=None, type=int,help='GPU id to use.')
parser.add_argument('--multiprocessing-distributed', action='store_true',help='Use multi-processing distributed training to launch ''N processes per node, which has N GPUs. This is the ''fastest way to use PyTorch for either single node or ''multi node data parallel training')

–world-size 表示分布式训练中,机器节点总数
–rank 表示节点编号(n台节点即:0,1,2,…,n-1)
–multiprocessing-distributed 是否开启多进程模式(单机、多机都可开启)
–dist-url 本机的ip,端口号,用于多机通信
–dist-backend 多机通信后端,默认使用nccl

创建模型 分布式进程组初始化完成后,需要将模型通过DDP进行包装。

源码153行:

model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) # 通过DDP接口创建一个多机model实例。

数据切分和DataLoader 准备好模型后,需要准备分布式训练所需的数据集,在分布式训练任务中(数据并行)多机的Dataloader和普通dataloader也有所区别,需要用DistributedSampler包装后再通过torch.utils.data.DataLoader实例化成Dataloader。

源码217行:

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) 通过DistributedSampler创建一个wapper,将数据集放入其中,再通过 torch.utils.data.DataLoader 创建可用于多机的Dataloader;

if args.distributed:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)else:train_sampler = Nonetrain_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),num_workers=args.workers, pin_memory=True, sampler=train_sampler)

其余部分,和正常的单机版训练差异不大,此处就不赘述了。

完整的利用ResNet50训练ImageNet的示例可参考:Pytorch官方仓库
分布式训练速度测评及结果,可以参考DLPerf:PyTorch ResNet50 v1.5测评

  • https://pytorch.org/docs/master/notes/ddp.html#ddp

参考

  • Distributed Data Papallel

  • DLPerf—分布式深度学习最佳入门(踩坑)指南

  • pytorch DistributedDataParallel多卡并行训练

  • Pytorch distributed 多卡并行载入模型

  • 和nn.DataParallel说再见

PyTorch并行与分布式(四)Distributed Data Papallel相关推荐

  1. PyTorch多卡分布式训练:DistributedDataParallel (DDP) 简要分析

    ©作者 | 伟大是熬出来的 单位 | 同济大学 研究方向 | 机器阅读理解 前言 因为课题组发的卡还没有下来,先向导师问了实验室的两张卡借用.之前都是单卡训练模型,正好在这个机会实践以下单机多卡训练模 ...

  2. Pytorch 并行训练(DP, DDP)的原理和应用

    Pytorch 并行训练(DP, DDP)的原理和应用 1. 前言 并行训练可以分为数据并行和模型并行. 模型并行 模型并行主要应用于模型相比显存来说更大,一块 device 无法加载的场景,通过把模 ...

  3. pytorch多GPU分布式训练代码编写

    本文主要讲述单机单卡.单机多卡的简单使用方法: 文章目录 单机单卡 单机多卡 DP DDP 单机单卡 单机单卡就是一台机器上只有一张卡,是最简单的训练方式 对于单机单卡,我们所需要做的就是把模型和数据 ...

  4. [并行与分布式程序设计] Flynn分类法 和 并行算法的评价指标

    并行与分布式程序设计 Flynn's taxonomy SISD SIMD MISD MIMD 并行算法性能的评价指标 加速比 Amdahl's Law 效率 可扩展性 三级目录 Flynn's ta ...

  5. PyTorch学习笔记(四):PyTorch基础实战

    PyTorch实战:以FashionMNIST时装分类为例: 往期学习资料推荐: 1.Pytorch实战笔记_GoAI的博客-CSDN博客 2.Pytorch入门教程_GoAI的博客-CSDN博客 本 ...

  6. 一步步读懂Pytorch Chatbot Tutorial代码(四) - 为模型准备数据

    文章目录 自述 有用的工具 代码出处 目录 头大 代码及说明 Prepare Data for Models 重点关注 indexesFromSentence zeroPadding binaryMa ...

  7. 分布式事务(Distributed Transactions)概述

    分布式事务是分布式领域必须要面对的问题,同时也是衡量一个分布式系统成熟度的重要指标.那么什么是分布式事务,哪些场景会涉及到分布式事务,如何实现分布式事务?本文将重点讨论以上问题. 分布式事务定义 分布 ...

  8. ExtJS4 API文档阅读(四)——Data

    2019独角兽企业重金招聘Python工程师标准>>> ExtJS4 API文档阅读(四)--Data 数据 Data包负责加载和保存你应用程序中的所有数据,由41个类构成,其中有三 ...

  9. PyTorch框架学习十四——学习率调整策略

    PyTorch框架学习十四--学习率调整策略 一._LRScheduler类 二.六种常见的学习率调整策略 1.StepLR 2.MultiStepLR 3.ExponentialLR 4.Cosin ...

  10. 并行与分布式、集群、网格计算、云计算的概念

    转自:http://blog.163.com/litianyichuanqi@126/blog/static/1159794412012387453794/ 一.并行计算与分布式计算 并行计算:并行计 ...

最新文章

  1. Python Django 文件下载代码示例
  2. 浅谈php中使用websocket
  3. 【Python】使用 eval 实现反射
  4. 富士施乐3065扫描教程_全面支持IT国产化 富士施乐70款机型获统信UOS兼容认证
  5. form 窗体增加边框_C#控件美化之路(13):美化Form窗口(上)
  6. python3中argparse模块
  7. 2009年5月软件设计师考试试题解析全国首发
  8. 个人计算机系统构造方式,计算机系统构造概要归纳.doc
  9. day 029 缓冲区和粘包 day 30 粘包的解决
  10. golang(7 方法重写)
  11. JUC编程入门(高并发)
  12. Hosts 文件切换工具
  13. NSURLRequest 使用(网络文摘)
  14. photoshop cc 2014(附完整软件和方法)
  15. [ASP调试]小旋风Web服务器使用
  16. 关于Pearson相关系数的显著性p值如何计算以及背后原因的思考
  17. mysql 右连接(right join)
  18. 清算(清分)与结算的区别
  19. c# 计算圆锥的体积_急求用c#计算圆柱体和圆锥体的体积的代码,下面是要求:...
  20. 5G与WiFi6空口技术对比

热门文章

  1. 1016: [JSOI2008]最小生成树计数 - BZOJ
  2. C#的排序算法以及随机产生不重复数字的几个Demo
  3. 与IDE相关的Attribute属性(C#)
  4. android 显示文章内容,在Android中,RecyclerView在重新创建后没有显示任何内容
  5. 计算机网络网络层实例例题
  6. ELK详解(九)——Logstash多日志收集实战
  7. Linux SSHD服务安装与维护详解(二)——SSHD调优和fail2ban联动
  8. maomao的现在与未来
  9. Arcgis api for js 3.x 离线开发(1)
  10. Broadleaf概念