deepspeed PipeLine Parallelism 源码解析

  • basic concept
    • PipeDream abstract
      • 1F1B
    • 4 steps
  • Code comprehension in deepspeed
    • preparation
    • code
      • convert model
      • profile
      • compute partition
      • runtime

basic concept

  • 2 台机器(num_node=2),每个机器有 8 个 GPU(8 rank in each node) ,则有 2*8=16 个节点,可以开 16 个 进程(ranks_num=16)
  • DDP 的最佳实践,每一个进程一张卡
  • 以上述 DDP 的最佳实践为例,解释下述名词
    • world-size = 16
    • group default=1
    • rank = 0~15
    • local rank : node1-0~7 ; node2-0~7
    • nnodes : 有多少台机器
    • node-rank : 当前是哪一台机器
    • nproc_per_node:每台机器有多少进程

PipeDream abstract

Gpipe流水线其存在两个问题:硬件利用率低,内存占用大.worker 之间只能同时处理一个 minibatch,系统中只有一个minibatch是活动的,这极大限制了硬件利用率。假如一个batch被分为 n 个micro-batch,则需要缓存 n 份activation。

1F1B 策略可以解决缓存 activation 的份数问题,使得 activation 的缓存数量只跟 stage 数相关,从而进一步节省显存,可以训练更大的模型。

1F1B

  • 由于前向计算的 activation 需要等到对应的后向计算完成后才能释放(无论有没有使用 Checkpointing 技术),因此在流水并行下,如果想尽可能节省缓存 activation 的份数,就要尽量缩短每份 activation 保存的时间,也就是让每份 activation 都尽可能早的释放,所以要让每个 micro-batch 的数据尽可能早的完成后向计算,因此需要把后向计算的优先级提高,让 micro-batch 标号小的后向比 micro-batch 标号大的前向先做。因此,如果我们让最后一个 stage 在做完一次 micro-batch 的前向后,立马就做本 micro-batch 的后向,那么我们就能让其他的 stage 尽可能早的开始后向计算,这就是 1F1B 策略。

  • 1F1B(one-forward-one-backward)的调度模式会在每台worker机器上 交替进行小批次数据的前向后向计算,同时确保这些小批量在"后向传播"时可以路由到"前向传播"的相同worker

4 steps

  • profile 通过小批量数据的profile推理出DNN训练时间。
  • compute partition 依据profile结果确定所有层的运行时间,然后进行优化,优化器返回一个带注释的运算符图,每个模型层映射到一个阶段ID。
  • convert model 对运算符图执行BFS遍历,为每个阶段生成一个单独的torch.nn.Module代码。PipeDream对每个阶段中的运算符进行排序,以确保它们保持与原始PyTorch模型图的输入输出依赖关系的一致性。
  • runtime schedule PipeDream 运行时根据其1F1B-RR调度策略将每个阶段(包括复制阶段的副本)分配给单个工作进程。

本文以 alexnet 为例子,分析这四个步骤在 deepspeed 源码中的实现,其中实现方式与 pipeDream 有些许不同

  • profile 使用 最简单 parameter method,跟据每一层的参数大小推理出训练时间
    结果示例:
[23296, 0, 0, 307392, 0, 0, 663936, 0, 884992, 0, 590080, 0, 0, 0, 0, 0, 37752832, 0, 0, 16781312, 0, 40970]
  • compute partition 依据profile结果确定所有层的运行时间,然后返回一个 parts 数组,保存模型层的分区结果.此时每张卡(即每个 stage 将会保存对应 global 模型分区后的结果映射)
    结果示例:
self.parts= [0,19,22]self._local_start = 0
self._local_stop = 19self._local_start = 19
self._local_stop = 22
  • convert model 每张卡根据分区后的结果,为每个阶段生成一个单独的torch.nn.Module代码
  • runtime PipeDream 运行时根据其1F1B-RR调度策略将每个阶段(包括复制阶段的副本)分配给单个工作进程

Code comprehension in deepspeed

preparation

  • deepspeedExamples github
  • deepspeed github
  • pipeDream reference

code

  1. config 基础配置
    为了简化理解,配置为简单的 pp=2 dp=1 mp=0
    上述配置可以在 DeepSpeedExamples/pipeline_parallelism/ds_config.json 进行配置,其中 micro batch num=train_batch_size/train_micro_batch_size_per_gpu=2.
# DeepSpeedExamples/pipeline_parallelism/ds_config.json{"train_batch_size" : 256,"train_micro_batch_size_per_gpu" : 128,......}

启动 deepspeed 时配置超参数 -p 设置流水并行数,如果 micro batch num == pp num ,则此时是最佳实践配置

# DeepSpeedExamples/pipeline_parallelism/run.sh
deepspeed train.py --deepspeed_config=ds_config.json -p 2 --steps=1

快速运行一个分布式流水并行度=2的 alexnet 网络, 如果设置可用 cuda 数量为 2,此时数据并行度将为 0,因为仅有的两个 rank 将会被用来进行流水并行
Quick Start:

export CUDA_VISIBLE_DEVICES=0,1
sh run.sh
  1. DeepSpeedExamples/pipeline_parallelism/train.py

pp=2,因此,RANK 0 and RANK 1 的进程将会被同时启动,也就是说下面的 main 函数将会分别被 RANK 0 & RANK 1 调用。通过 os 函数 os.getenv('RANK') 可以查看当前函数调用所在的卡

if __name__ == '__main__':# __main__ function will be recall 4 times cause open four threadsargs = get_args()deepspeed.init_distributed(dist_backend=args.backend)args.local_rank = int(os.environ['LOCAL_RANK'])torch.cuda.set_device(args.local_rank)if args.pipeline_parallel_size == 0:train_base(args)else:train_pipe(args)

1.1. deepspeed.init_distributed(dist_backend=args.backend)
主要是为了初始化通信方法,通信方法可以参考 https://zhuanlan.zhihu.com/p/465967735,https://zhuanlan.zhihu.com/p/79030485
Create a torch backend object, initialize torch distributed, and assign to cdb

# miniconda3/lib/python3.9/site-packages/deepspeed/comm/comm.py
# Main DeepSpeed Comms. public API.
def init_distributed(dist_backend="nccl",auto_mpi_discovery=True,distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT,verbose=True,timeout=default_pg_timeout,init_method=None,dist_init_required=None,config=None):

1.2 train_pipe(args)

def train_pipe(args, part='parameters'):......net = AlexNet(num_classes=10)net = PipelineModule(layers=join_layers(net),loss_fn=torch.nn.CrossEntropyLoss(),num_stages=args.pipeline_parallel_size,partition_method=part,activation_checkpoint_interval=0)trainset = cifar_trainset(args.local_rank)engine, _, _, _ = deepspeed.initialize(args=args,model=net,model_parameters=[p for p in net.parameters() if p.requires_grad],training_data=trainset)for step in range(args.steps):loss = engine.train_batch()

1.2.0 layers=join_layers(net)

将 视觉上的直观 layers 保存为 feature, avgpool,classifier 为顺序的数组传入 pipeModule 中

def join_layers(vision_model):layers = [*vision_model.features,vision_model.avgpool,lambda x: torch.flatten(x, 1),*vision_model.classifier,]return layers

layers:

[Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), AdaptiveAvgPool2d(output_size=(6, 6)), <function join_layers.<locals>.<lambda> at 0x7fc8a6193550>, Dropout(p=0.5, inplace=False), Linear(in_features=9216, out_features=4096, bias=True), ReLU(inplace=True), Dropout(p=0.5, inplace=False), Linear(in_features=4096, out_features=4096, bias=True), ReLU(inplace=True), Linear(in_features=4096, out_features=10, bias=True)]

convert model

1.2.1 PipelineModule(layers=join_layers(net),…)

  • Setup world info
  • Initialize partition information

Setup world info:

#
# dist.new_group() 将 RANK 实例放入一个组中
self.world_group = dist.new_group(ranks=range(dist.get_world_size()))
self.global_rank = dist.get_rank(group=self.world_group)
self.world_size = dist.get_world_size(group=self.world_group)
self.local_rank = int(os.environ.get("LOCAL_RANK", None))
world_group=<torch._C._distributed_c10d.ProcessGroupNCCL object at 0x7fc8a61af7f0> global_rank =1 world_size=2 local_rank=1
world_group=<torch._C._distributed_c10d.ProcessGroupNCCL object at 0x7f0e9c8af970> global_rank =0 world_size=2 local_rank=0

Initialize partition information

        # self._layer_specs = list(layers)self._num_layers = len(self._layer_specs)self._local_start = 0self._local_stop = Noneself._partition_layers(method=partition_method)

profile

使用简单的 parameter method 通过迭代计算,使用 _count_layer_params() 方法计算出模型每一层的参数量
1.2.1.1 _partition_layers()

        elif method == 'parameters':param_counts = self._count_layer_params()para_len=len(param_counts)...# profile count paradef _count_layer_params(self):"""Count the trainable parameters in individual layers.This routine will only build one layer at a time.Returns:A list of the number of parameters in each layer."""param_counts = [0] * len(self._layer_specs)for idx, layer in enumerate(self._layer_specs):if isinstance(layer, LayerSpec):l = layer.build()params = filter(lambda p: p.requires_grad, l.parameters())param_counts[idx] = sum(p.numel() for p in params)elif isinstance(layer, nn.Module):params = filter(lambda p: p.requires_grad, layer.parameters())param_counts[idx] = sum(p.numel() for p in params)return param_counts

得到一个映射每层参数量的数组:

[23296, 0, 0, 307392, 0, 0, 663936, 0, 884992, 0, 590080, 0, 0, 0, 0, 0, 37752832, 0, 0, 16781312, 0, 40970]

compute partition

跟据每一层的参数量,使用 partition_balanced() 方法进行简单的 stage 划分,平衡每张卡的计算量:

            ...self.parts = ds_utils.partition_balanced(weights=param_counts,num_parts=num_stages)# compute partition miniconda3/lib/python3.9/site-packages/deepspeed/runtime/utils.pydef partition_balanced(weights, num_parts, eps=1e-3):num_items = len(weights)# First check for the trivial edge caseif num_items <= num_parts:return partition_uniform(num_items, num_parts)weights_ = prefix_sum_inc(weights)# Find the smallest bottleneck (weight of heaviest partition)bottleneck = _rb_partition_balanced(weights_, num_parts, eps=eps)# Now compute that partitioningparts, success = _lprobe(weights_, num_parts, bottleneck)print(f':::::::::::::::;part::::::::::::::{parts}::::::::::sucess:::::::{success}')assert successreturn parts

得到一个 global 的数组 保存分层的 index,例如下面的数组表示,stage 0 将运行 层0~层19,stage 1 将运行 层19~层22

self.parts= [0,19,22]

_set_bounds() 方法通过传入 rank 的 id(即 stage id)可以使得每张卡存储不同的私有的变量,以实现分区

            ...self._set_bounds(start=self.parts[stage_id], stop=self.parts[stage_id + 1])# def _set_boundsdef _set_bounds(self, start=None, stop=None):"""Manually define the range of layers that will be built on this process.These boundaries are treated as list slices and so start is inclusive and stop isexclusive. The default of None for both results in all layers being builtlocally."""self._local_start = startself._local_stop = stopprint(f'::::::::::::_local_start:;:::{self._local_start}')print(f':::::::::::_local_stop:::::::{self._local_stop}')...self.forward_funcs = []self.fwd_map = {}self.tied_modules = nn.ModuleDict()self.tied_weight_attrs = {}self._build()
self._local_start = 0
self._local_stop = 19self._local_start = 19
self._local_stop = 22::RANk0:::::::::::forward_funcs:::::[Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)), ReLU(inplace=True), MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False), AdaptiveAvgPool2d(output_size=(6, 6)), <function join_layers.<locals>.<lambda> at 0x7f6308dde550>, Dropout(p=0.5, inplace=False), Linear(in_features=9216, out_features=4096, bias=True), ReLU(inplace=True), Dropout(p=0.5, inplace=False)]::::::::;fwd_map:::::{'0': 0, '1': 1, '2': 2, '3': 3, '4': 4, '5': 5, '6': 6, '7': 7, '8': 8, '9': 9, '10': 10, '11': 11, '12': 12, '13': 13, '15': 15, '16': 16, '17': 17, '18': 18}:::::::tied_modules:::::ModuleDict():::::::::RANk1:::::::::::forward_funcs:::::[Linear(in_features=4096, out_features=4096, bias=True), ReLU(inplace=True), Linear(in_features=4096, out_features=10, bias=True)]::::::::;fwd_map:::::{'19': 0, '20': 1, '21': 2}:::::::tied_modules:::::ModuleDict():::::::

如何做到分区?
我们知道程序一开始便开启了两个 RANK 进程,即每个 RANK 都会运行 global 的代码.当每个 RANK 运行到 set_bound 函数时,传入的参数 stage_id 是通过 get_coord(global_rank) 得到的,即不同的 RANK 执行到这一步时,会得到不同的 stage_id 此时,不同 RANK 上保存的 self._local_start 将会不同.

再使用 _build 函数,根据每个 RANK 所保存的不同 self._local_start & stop map 一下所对应的 module forward 函数名字 得到 RANK 私有的 self.forward_funcs & self.fwd_map,之后便可以使用 self.module() 内存有不同的 module 数据,此时每张卡内已经存有不同的 model 结构了.之后初始化 engine 时,传入的便已经是分区好的model.

1.2.2 trainset = cifar_trainset(args.local_rank)

    ...dist.barrier()if local_rank != 0:dist.barrier()trainset = torchvision.datasets.CIFAR10(root=dl_path,train=True,download=True,transform=transform)if local_rank == 0:dist.barrier()return trainset

dist.barrier()
pytorch在分布式训练过程中,对于数据的读取是采用主进程预读取并缓存,然后其它进程从缓存中读取,不同进程之间的数据同步具体通过torch.distributed.barrier()实现。

在上面的代码示例中,如果执行 cifar_trainset() 函数的进程不是主进程,即rank不等于0,会执行相应的 torch.distributed.barrier(),设置一个阻塞栅栏,让此进程处于等待状态,等待所有进程到达栅栏处(包括主进程数据处理完毕);如果执行create_dataloader()函数的进程是主进程,其会直接去读取数据并处理,然后其处理结束之后会接着遇到torch.distributed.barrier(),此时,所有进程都到达了当前的栅栏处,这样所有进程就达到了同步,并同时得到释放。

1.2.3 deepspeed.initialize(…)

        engine = PipelineEngine(...)

1.2.4 loss = engine.train_batch()

    def train_batch(self, data_iter=None):self.module.train() #????????????????????self.timers('train_batch').start()sched = schedule.TrainSchedule(micro_batches=self.micro_batches,stages=self.num_stages,stage_id=self.stage_id)self._exec_schedule(sched)self.agg_train_loss = self._aggregate_total_loss()

runtime

1.2.4.1 schedule.TrainSchedule(micro_batches=self.micro_batches…)
runtime schedule 将会自动调用 steps 函数

    1. 首先根据传入的 micro batch size 和 batch size 得到 num micro batch,既可以得到每个 stage 有多少个 micro batch
    1. 根据计算的 num micro batch 和 1F1B 策略得到一个 stage 的步数,在此例子中,每个stage 的步数为 6.
    1. 为每个 stage 的步,标一个 index,为 micro batch id,并为这些 id 计算出 cmds。cmds 是计算出的每一步需要做的函数。例如 stage 0 的 step 0 ,需要 load data 和 进行第一步 forward。
    1. 接下来每个 stage 将跟据生成的 cmds,在各自的 rank 里串行的执行每一步。然后每个 stage 有事并行执行的。前面的计算就是为了不同 stage 之间可以同步进行,例如 stage 0 的 step 1 send activation 后,stage 1 step 2 需要执行玩 step 0 的 load 和 forward,并几乎没有延时地执行 receive activation。这样的话,就可以有效率的run 整个网络。
# miniconda3/lib/python3.9/site-packages/deepspeed/runtime/pipe/schedule.py
class TrainSchedule(PipeSchedule):def steps(self):....yield cmds

cmds result:

[RecvActivation(buffer_id=0), LoadMicroBatch(buffer_id=0), ForwardPass(buffer_id=0)]

图示:

其中该图展现了运行时的物理结构,分为两个 stage 进行流水并行,每个 stage 有 6 步,stage 内串行执行,stage 间并行执行。

  • 0 表示在一个 batch 内的 第一个 micro batch 的数据,他将在 rank 0 上通过使用 load 函数被加载,然后跟据生成的 nn.Module 模块执行 层0-19 的 Module 的 forward 函数,forward 结束后的 activation 将会由 step 1 发送给 stage 1 的 step 1.在 stage 1 的 0,也将使用 load 加载数据,然后接收 stage1 发送的前 19 层的 activation,在这个基础上用同样的数据完成模型 层19-22 的 forward 计算。
  • 根据 1f1b 策略,一个batch 的数据前向结束后,应该立即进行 backward,此时 stage 1 的 step 2 将对 层19-22 的参数进行反向传播,反向传播后产生的 gradient 参数将有 stage 1 的 step 3 发送给 stage 0 的 step 3. stage 0 的 step 3 接收到层 19-22 的 grad 后便可以进行 层0-19 的 反向传播,同时它还需要做 第二个 micro batch 前向 activation 的 发送操作。

深度学习大模型训练--分布式 deepspeed PipeLine Parallelism 源码解析相关推荐

  1. 【深度学习实战03】——YOLO tensorflow运行及源码解析

    本文章是深度学习实战系列第三讲文章,以运行代码+源码分析 为主: 转载请注明引用自:https://blog.csdn.net/c20081052/article/details/80260726 首 ...

  2. 深度学习中模型训练效果不好的原因以及防止过拟合的方法

    深度学习中模型训练效果不好的原因 1. 是否选择合适的损失函数 2. 是否选择了合适的Mini-batch size 3. 是否选择了合适的激活函数 4. 是否选择了合适的学习率 5. 优化算法是否使 ...

  3. 【深度学习】模型训练过程可视化思路(可视化工具TensorBoard)

    [深度学习]模型训练过程可视化思路(可视化工具TensorBoard) 文章目录 1 TensorBoard的工作原理 2 TensorFlow中生成log文件 3 启动TensorBoard,读取l ...

  4. 一文看尽深度学习中的20种卷积(附源码整理和论文解读)

    点击上方"3D视觉工坊",选择"星标" 干货第一时间送达 引言 卷积,是卷积神经网络中最重要的组件之一.不同的卷积结构有着不一样的功能,但本质上都是用于提取特征 ...

  5. 2021-06-26一文看尽深度学习中的20种卷积(附源码整理和论文解读)

    卷积,是卷积神经网络中最重要的组件之一.不同的卷积结构有着不一样的功能,但本质上都是用于提取特征.比如,在传统图像处理中,人们通过设定不同的算子来提取诸如边缘.水平.垂直等固定的特征.而在卷积神经网络 ...

  6. 【信号识别】基于matlab深度学习CNN信号调制分类【含Matlab源码 2066期】

    ⛄一.深度学习CNN信号调制分类概述 1 背景介绍 在通信信号处理领域, 特别是在非协作通信信号盲解调研究领域, 每时隙突发信号的调制方式不同, 必须进行信号的调制方式自动识别.信号的调制方式识别效果 ...

  7. 基于深度学习的恶意样本行为检测(含源码) ----采用CNN深度学习算法对Cuckoo沙箱的动态行为日志进行检测和分类...

    from:http://www.freebuf.com/articles/system/182566.html 0×01 前言 目前的恶意样本检测方法可以分为两大类:静态检测和动态检测.静态检测是指并 ...

  8. 基于python+pyqt+深度学习实现图像转素描【附部分源码】

    文章目录 前言 视频演示 一.ui配置 1.left_button.py源码 2.switch_btn.py源码 3.主页面重要代码 二.界面功能 1.初始化模型 2.初始化模型 3.相机采图 4.获 ...

  9. 碉堡了!程序员用深度学习写了个老板探测器(附源码)

    如果上班的时候想放松一下,或者直说想偷偷懒,看点和工作无关的网页,这时候万一老板突然出现在背后,会不会感到很难堪呢? 有的浏览器设置了boss按键,手快的人还可以切换屏幕,不过总会显得不自然,而且经常 ...

最新文章

  1. 【CV】吴恩达机器学习课程笔记第18章
  2. 数组方法深入扩展(遍历forEach,filter,reduce等)
  3. python缓存技术_Python中整数的缓存机制讲解
  4. leetcode-665-Non-decreasing Array
  5. 【Leetcode | 13】56. 合并区间
  6. capture 部分元器件编号_十大电子元器件及其相关基础知识
  7. 重温《数据库系统概论》【第一篇 基础篇】【第1章 绪论】
  8. EasyUI——DataGrid中嵌入Radio
  9. rup 裁剪_裁剪师是什么意思
  10. linux下c爬取天气的源码,一个在conky中实现获取本地天气的c源代码
  11. flutter 初视回味
  12. 关于脑电波的黑科技,离我们生活还有多远。
  13. 本周最新文献速递20220320
  14. ​华为回应出售手机业务传闻:假消息;微软将ChatGPT整合到更多工具中:不用写代码就能开发应用;苹果更新Mac产品线|极客头条...
  15. qt vs tools
  16. (Java)模拟肯德基点餐系统
  17. 打破 FOXMAIL 疯狂占用磁盘读写资源的魔障
  18. 关于TTMS影院系统的总结
  19. JAVA漏洞扫描工具之墨菲安全for IDEA
  20. 共享打印机提示“0x00000709”错误的解决方法

热门文章

  1. 展讯Sprd设置-电池-UI
  2. Android10.0 展讯平台解锁
  3. JavaScript和Jqurery
  4. 破窗理论:搜索引擎暗规则之三(转)
  5. opencv学习---计算图像的水平积分投影和垂直积分投影
  6. 2018_9_22 模拟赛
  7. java 连接teradata_java连接teradata | 学步园
  8. 主控开发(一)Jetson nano环境搭建
  9. Java小程序开发实例!字节跳动Java岗经典面试真题,实战解析
  10. 华硕(ASUS)M50S81VN-SL外接 Dell 2209wa出现水波纹