[源码解析] 深度学习流水线并行 PipeDream(6)— 1F1B策略

文章目录

  • [源码解析] 深度学习流水线并行 PipeDream(6)--- 1F1B策略
    • 0x00 摘要
    • 0x01 流水线比较
      • 1.1 普通流水线
      • 1.2 Gpipe流水线
      • 1.3 1F1B流水线
        • 1.3.1 思路
        • 1.3.2 图示
    • 0x02 PipeDream 实现
      • 2.1 总体逻辑
      • 2.2 权重问题
      • 2.3 Weight Stashing
      • 2.4 Vertical Sync
      • 2.5 缓冲区
    • 0x03 代码
      • 3.1 总体代码
      • 3.2 训练函数
      • 3.3 前向传播
      • 3.4 反向传播
      • 3.5 Weight Stashing
    • 0xEE 个人信息
    • 0xFF 参考

0x00 摘要

在前文中,我们介绍了PipeDream的总体架构,Profile阶段,计算分区阶段,模型转换阶段,运行时引擎和通信模块,本文是 PipeDream 系列最后一篇,介绍 1F1B 策略,这是 PipeDream 最大的贡献。

流水线并行其他文章链接如下:

[源码解析] 深度学习流水线并行Gpipe(1)—流水线基本实现

[源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积

[源码解析] 深度学习流水线并行 GPipe(3) ----重计算

[源码解析] 深度学习流水线并行之PipeDream(1)— Profile阶段

[源码解析] 深度学习流水线并行 PipeDream(2)— 计算分区

[源码解析] 深度学习流水线并行 PipeDream(3)— 转换模型

[ 源码解析] 深度学习流水线并行 PipeDream(4)— 运行时引擎

[源码解析] 深度学习流水线并行 PipeDream(5)— 通信模块

0x01 流水线比较

首先,我们比较一下目前分析过的各个流水线。

1.1 普通流水线

DNN模型组成的基本单位是层。PipeDream将DNN的这些层划分为多个阶段——每个阶段(stage)由模型中的一组连续层组成。PipeDream把模型的不同的阶段部署在不同的机器上,每个阶段可能有不同的replication。该阶段对本阶段中所有层执行向前和向后传递。PipeDream将包含输入层的阶段称为输入阶段,将包含输出层的阶段称为输出阶段。

在最简单的情况下,和传统的模型并行训练中一样,系统中只有一个minibatch是活动的。上图就显示了一个计算时间线,该示例有四台机器和一个管道,可以认为是一个最普通的流水线

  • 在正向阶段,每个阶段对本阶段中的层的minibatch执行正向传递,并将结果发送到下一阶段。输出级在完成前向传递后,计算minibatch的损失。
  • 在后向阶段,每个阶段形成后向通道,逐一将损失传播到前一阶段。

1.2 Gpipe流水线

因为PipeDream是基于Gpipe进行改进,所以我们也要基于 Gpipe 看看其问题所在。

Gpipe 的流水线并行训练图如下:

  • 将被训练的这些层划分为多个阶段,每个阶段包含模型之中一组连续的层。
  • 把输入数据minibatch进行分片,分成 m 个microbatches,像 allreduce 一样,计算完一些就传给下个节点,最后同步更新参数。
  • GPipe使用现有的技术,如梯度累积来优化内存效率,通过丢弃前向传播和后向传播之间的activation存储来交换内存,在后向传递需要activation时再重新计算它们。

Gpipe的流水线有几个问题:

  • 过多流水线刷新导致空闲时间的增加。
  • 如果m很小,Gpipe可能会由于重新计算开销和频繁的管道刷新而降低硬件效率,所以 m 一般都设置的较大。
  • 于是需要缓存 m 份 activation导致内存增加。原因是每个microbatch前向计算的中间结果activation都要被其后向计算所使用,所以需要在内存中缓存。

1.3 1F1B流水线

PipeDream 的 1F1B(One Forward pass followed by One Backward pass)策略就可以解决缓存 activation 的份数问题,使得 activation 的缓存数量只跟阶段(stage)数相关,从而进一步节省显存。

Pipeline的并行方式是把模型的不同层放到不同机器(节点)上,顺序地进行前向计算和反向计算。

PipeDream的目标是:以最小化总体训练时间的方式将流水线并行,模型并行性和数据并行性结合起来。然而,要使这种方法对大型DNN模型有效,获得流水线并行化训练的潜在收益,PipeDream 必须克服三个主要挑战:

  1. 如何跨可用计算资源自动划分工作(模型的层)。
  2. 在确保训练任务向前推进的同时,如何调度计算以最大化吞吐量。
  3. 面对流水线带来的异步性,如何确保训练有效。

其中 1F1B 就对应了后面两个挑战。

1.3.1 思路

我们剖析下1F1B策略的思路。

终极目的是:减少activation 的缓存数量,降低显存占用,从而可以训练更大的模型。

目前困境是:即便使用了Checkpointing 技术,前向计算的 activation 也需要等到对应的后向计算完成之后才能释放。

解决思路是:努力减少每个 activation 的保存时间,即这就需要每个 micro-batch 数据尽可能早的完成后向计算让,从而让每个 activation 尽可能早释放。

注意:PipeDream中,最后使用的是minibatch这个单词,所以我们可以认为PipeDream的minibatch就是 Gpipe的 micro-batch,从这里开始,都使用 minibatch。

解决方案是:

  • 让最后一个 stage(下图中的 Machine 4) 在做完一次 minibatch 的前向传播之后,就立即做本minibatch 的后向传播,那么就可以让其他 stage 尽可能早的开始后向传播计算,这就是 1F1B 策略。有点类似于把整体同步变成了众多小数据块上的异步,而且众多小数据块都是大家独立更新。
  • 在 1F1B 的稳定状态下,会在每台机器上严格交替的进行前向计算/后向计算,这样使得每个GPU上都会有一个minibatch数据正在处理,从而保证资源的高利用率(整个pipeline比较均衡,可忽略的流水线暂停,没有流水线 flush,能确保以固定周期执行每个阶段上的参数更新)
  • 面对流水线带来的异步性,1F1B 使用不同版本的权重来确保训练的有效性。
  • PipeDream 又扩展了 1F1B,对于使用数据并行的stage,采用 round-robin的调度模式将任务分配在同一个stage的各个设备上,保证了一个batch的数据的前向传播计算和后向传播计算发生在同一台机器上,这就是 1F1B-RR(one-forward-noe-backward-round-robin)。

实际上,1F1B策略就是把一个batch的同步变为了众多小数据(minibatch)上的异步,计算完一个minibatch就立刻反向,一个minibatch的反向结束之后就更新对应worker的梯度。所有worker一起跑起来。可以理解为从 BSP 执行变成了 ASP 执行。

1.3.2 图示

下图是实施了 1F1B 的流水线。

  • 把一个 batch 分成多个mini batches,比如把一个 batch 分成 1,2,3,4 这4个mini batches。
  • 把多个 mini batches 逐一插入到流水线。
  • Machine 1 先计算 蓝色 1 的前向传播,然后把蓝色 1 发送给 Machine 2 继续计算。
  • Machine 2 接着计算 蓝色 2 的前向传播,然后把蓝色 1 发给 Machine 2 继续计算。
  • 当蓝色 1 由上至下遍历了 Machine 1 ~ 4,则完成了全部前向传播,于是开始进行反向传播,对应了第一个绿色 1,然后逆向传递到 Machine 3 ~ 1。
  • 当数据 1 完成了全部反向传播,即绿色 1 来到了 Machine 1。
  • 每个机器在完成自己 mini batch 的反向传播之后,会在本地进行梯度更新。
  • Machine 和 Machine 之间只传送模型的一个子集,这样计算和通讯可以并行。

需要注意,下图给出了初始阶段和稳定阶段,我们后续讲解中会提到这两个阶段。

0x02 PipeDream 实现

首先给出一个包含4个GPU的示例图,图内也给出了其中一个GPU(Mach. 3)的时间流示例。这里计算和梯度/激活通讯是有部分重叠的。

2.1 总体逻辑

我们以一次训练为例,结合下图来说明。

需要介绍一个名词 NOAM,活动小批次数目。

NUM_OPT_ACTIVE_MINIBATCHES (NOAM) = ⌈ (# machines) / (# machines in the input stage) ⌉

其意义是:基于我们的算法生成的分区,为了在稳定状态下保持流水线满负荷,每个输入级副本所允许的最小批处理数

上图显示了管道的相应计算时间线,每个流水线有4个阶段在不同机器上运行,所以此配置的NOAM为 4。

我们具体再分析下运行步骤。

  • 在训练开始的启动阶段(图上的Startup State),输入的stage的先读入足够多minibatch的数据(就是NOAM个),以保证pipeline在稳定阶段时,各个设备上都有相应的工作在处理。对于上图,就是输入阶段发送四个小批次传播到输出阶段。
  • 一旦输出阶段完成第一个小批次的前向传播(就是Machine 4 第一个蓝色1),它就对同一个小批次执行后向传播(就是Machine 4 的第一个绿色 1)。
  • 然后开始交替执行后续小批次的前向传播和后向传播(就是 Machine 4 的 2前,2后,3前,3后…)。
  • 当反向传播过程开始传播到管道中的早期阶段时(就是Work 3 ~ Work 1),每个阶段开始在不同小批次的正向和反向过程之间交替进行。
  • 在稳定状态下,每台机器都忙着对一个小批次进行正向传播或反向传播。

2.2 权重问题

Pipeline的训练模式会引入两种参数不一致性,因为实际是ASP计算,没有协调会越干越乱:

  • 在一个原生的PipeDream流水线中,每个阶段的前向传播都是使用某一个版本的参数来执行,而其后向传播则是使用不同版本的参数来执行的,即同一个minibatch的前向传播和后向传播使用的参数不一致。例如上图所示:

    • 当 minibatch 5 进入到 worker 1 时,它的前向传播逻辑在 minibatch 1 的后向传播计算之后执行,即它前向传播计算时候使用的参数是 minibatch 1 后向传播计算之后更新的参数。
    • 但是 minibatch 5 后向传播逻辑是在 “minibatch 2, minibatch 3, minibatch 4” 执行完后才开始计算,即此时使用的参数是"minibatch 1, minibatch 2, minibatch 3, minibatch 4" 后向传播计算之后更新的参数。
    • 这就导致 minibatch 5 的前向计算和后向计算时候,使用的参数不一致。即,第一行 Machine 1,蓝色 5 号 和 绿色 5 号 计算时候,必须都使用 绿色 1 号之后更新的参数。
  • 同一个minibatch在不同stage做同样操作(同样做前向操作,或者同样做后向传播)使用的参数版本不一致。同样如上图所示:
    • 对于 minibatch 5 在 worker 1 上的前向计算部分(蓝色5),他的前向逻辑在 minibatch 1 的后向计算以后执行。
    • 但是 minibatch 5 在 worker 2 上的前向计算部分(蓝色5),是在 “minibatch 1, minibatch 2” 的后向计算结束后才执行。
    • 这就导致了 minibatch 5 在两个stage上前向计算使用的参数版本不一致。

为解决这两个问题,PipeDream 分别采用了 weight stashing 和 Vertical Sync 两种技术

  • Weight stashing : 为权重维护多个版本,每个active minibatch都有一个版本。每个stage 都用最新版本的权重进行前向计算,处理输入的minibatch。计算前向传播之后,会将这份参数保存下来用于同一个minibatch的后向计算。Weight stashing确保在一个阶段内,相同版本的模型参数被用于给定小批量的向前和向后传播,但是不能保证跨阶段间,一个给定的小批次使用模型参数的一致性
  • Vertical Sync : 每个minibatch进入pipeline时都使用输入stage最新版本的参数,并且参数的版本号会伴随该minibatch数据整个生命周期,在各个阶段都是用同一个版本的参数(而不是Weight stashing那样都使用最新版本的参数),从而实现了stage间的参数一致性

2.3 Weight Stashing

我们以下图为例:

Worker 1, work 2 … 各自有自己的权重, 记为 W1W_1W1​,W2W_2W2​ … 即,图上的 Wi(j)W_i^{(j)}Wi(j)​,下标 i 表示 第 i 个 worker,上标 ( j ) 表示第 j 个minibatch。

在一个阶段(每一个 worker)中:

  • 每次向后传播都会导致权重更新,下一次向前传使用最新版本的可用权重。就是说,每个 worker 的权重,在出现一个新的绿色后向传播之后会被更新。接下来的新操作应该基于这个新权重。
  • 计算前向传播之后,会将这份前向传播使用的权重保存下来用于同一个 minibatch 的后向计算。
  • Weight stashing确保在一个阶段内,相同版本的模型参数被用于给定小批量的向前和向后传播。

我们以上图为例:

Worker 1 第一行的蓝色 5 依赖于 它前面同一行的绿色 1。Worker 1 所在行的第一个绿色 1 结束时,代表了 minibatch 1 完成了本次流水线的 4 次前向传播,4次后向传播。所以是一个新版本的 weight 1,就是W1(1)W_1^{(1)}W1(1)​。因此,Work 1 的两个 minibatch 5(蓝色前向和绿色后向)都应该基于新版本 W1(1)W_1^{(1)}W1(1)​ 计算。因此需要记录下来 新版本 W1(1)W_1^{(1)}W1(1)​。

Worker 2 第二行的蓝色 5 依赖于它前面同一行的绿色 2。同理,Worker 1 的第一个绿色 2 结束时,代表了 minibatch 2 完成了本次流水线的 4 次前向传播,4次后向传播。所以是一个新版本的 weight 2。此时的 minibatch 6 的前向和图上未标出的绿色后向都应该基于 新版本的 weight 2 计算,因此需要记录下来 新版本 W2(2)W_2^{(2)}W2(2)​。

对于 worker 3,从它的角度看,它本身的权重应该执行两次前向,两次后向(worker 4一次,然后 worker 3 第二次)。当执行 minibatch 5 的前向传播时候,W3(3)W_3^{(3)}W3(3)​已经更新(被minibatch 3 的绿色更新),所以需要记录下来 W3(3)W_3^{(3)}W3(3)​,为了以后 minibatch 5 的后向更新使用。

依次类推,worker 1 需要记录 W1(1)W_1^{(1)}W1(1)​, W1(2)W_1^{(2)}W1(2)​,W1(3)W_1^{(3)}W1(3)​,W1(4)W_1^{(4)}W1(4)​,… 的每一个新版本。就是 worker 1 对应 minibatch 1,2,3,4 的各个权重。

2.4 Vertical Sync

目前问题是:worker 1 上 minibath 5 的前向计算用的是 1 后向传播之后的参数,但worker 2 上计算 minibath 5 是用 2 后向传播之后的参数,最后汇总的时候不就又乱了?

Vertical Sync机制是:每个进入管道的 minibatch(bib_ibi​)都与其进入流水线输入阶段时候的最新权重版本w(i−x)w^{(i-x)}w(i−x)相联系。当小批次在流水线前向传播阶段前进时候,这个版本信息随着激活值和梯度一起流动。在所有阶段中,bib_ibi​ 的前向传播使用保存的w(i−x)w^{(i-x)}w(i−x)来计算,而不是Weight stashing那样都使用最新版本的参数。在使用保存的 w(i−x)w^{(i-x)}w(i−x)来计算后向传播之后,每个阶段独立应用权重更新,创建最新权重w(i)w^{(i)}w(i),然后删除w(i−x)w^{(i-x)}w(i−x)。

用下面图来说明:

强制所有worker在计算 minibatch 5 的时候都用本worker做 minibatch 1 反向传播之后的参数,具体来说就是:

对于 worker 2,使用本阶段绿色1(1反向传播之后,更新的本阶段权重)来做 5 的前向传播。

同理,对于 worker 3,使用本阶段绿色1(1反向传播之后,更新的本阶段权重)来做 5 的前向传播。对于 worker 4,使用本阶段绿色1(1反向传播之后,更新的本阶段权重)来做 5 的前向传播。

但是,这样同步会导致很多计算浪费无用。比如5更新时用的1的权重,但2/3/4后向传播的权重都白白计算了,所以默认不使用Vertical Sync。这样虽然每层不完全一致,但是由于weight stashing的存在,所有的参数都是有效的。

2.5 缓冲区

这里对缓冲区的处理再做一下说明。

**参数状态。**对于每个阶段,PipeDream主要维护与GPU内存中直接分配给该阶段的层相关的所有参数。每个层的参数分别存储,每个层分配一个唯一的ID。如果没有复制该阶段,PipeDream将更新应用到存储在GPU内存中的参数数据的最新版本,当所提供的GPU缓冲区中的权重更新可用时。如果复制了stage,则将权重更新复制到主机内存,然后发送到参数服务器。当新版本的参数可用时,作为权重存储方案的一部分,不会立即丢弃以前的版本。参数数据只有在使用较新参数的向后传递被格式化后才会被丢弃。

**中间状态。**每个层的中间数据也被分配了一个唯一的blob ID。当从前一级(或在输入级的情况下从磁盘)接收中间数据时,PipeDream将中间数据复制到GPU内存,并在工作队列中放置一个指向相关缓冲区的指针。在关联的minibatch完成该阶段的向后传递之前,forward传递的中间数据不会被丢弃。当ML工作人员完成使用后,以及如果需要,在将其发送到下一阶段之后,来自向后传递的中间数据就被释放。由于向前和向后传递中对中间数据的要求不同,PipeDream中的stage通常管理来自向前传递的多个版本的中间数据,而只管理来自当前运行的向后传递的单个版本的中间数据。

0x03 代码

3.1 总体代码

我们用 runtime/translation/main_with_runtime.py 来分析。

下面省略部分次要代码。

使用 runtime 的总体逻辑可以如下文件为例 :runtime/translation/main_with_runtime.py。主要逻辑是:

  • 解析输入参数。
  • 加载,生成模型。
  • 依据模块来构建模型。
  • 依据参数进行配置比如输入大小,batch size等。
  • 遍历模型的每个层(跳过最后loss层)。
    • 遍历每层的输入,构建输入张量。
    • 通过调用stage对应的forward函数,构建出输出。
    • 遍历每层的输出,设置其类型和形状 。
  • 构建输出值张量类型。
  • 加载配置文件。
  • 构建一个 StageRuntime。
  • 建立 optimizer,这里 optimizer,使用了AdamWithWeightStashing 或者 SGDWithWeightStashing,所以就是使用了 weight stashing。
  • 加载 dataset。
  • 进行训练,保存checkpoint。

总体代码如下:

def main():# 解析输入参数global args, best_prec1args = parser.parse_args()# Special case handling for GNMT modell2_promote()torch.cuda.set_device(args.local_rank)# build tokenizertokenizer = Tokenizer(os.path.join(args.data_dir, config.VOCAB_FNAME))# define loss functioncriterion = build_gnmt_criterion(vocab_size=tokenizer.vocab_size, padding_idx=config.PAD, smoothing=0.1)# create stages of the model# 加载,生成模型module = importlib.import_module(args.module)args.arch = module.arch()# 依据模块来构建模型model = module.model(criterion)# 依据参数进行配置比如输入大小,batch size等input_size = [args.max_length_train, args.batch_size]training_tensor_shapes = {"input0": input_size, "input1": [args.batch_size],"input2": input_size, "target": [args.max_length_train * args.batch_size],"target_length": [args.batch_size]}dtypes = {"input0": torch.int64, "input1": torch.int64, "input2": torch.int64,"target": torch.int64, "target_length": torch.int32}inputs_module_destinations = {"input0": 0, "input1": 0, "input2": 0}target_tensor_names = {"target", "target_length"}# 遍历模型的每个层(跳过最后loss层)for module_id, (stage, inputs, outputs) in enumerate(model[:-1]):  # Skip last layer (loss).input_tensors = []# 遍历每层的输入,构建输入张量for module_input in inputs:if module_input in inputs_module_destinations:inputs_module_destinations[module_input] = module_idinput_tensor = torch.ones(tuple(training_tensor_shapes[module_input]),dtype=dtypes[module_input])#.cuda()input_tensors.append(input_tensor)#stage.cuda()# PyTorch should not maintain metadata for a backward pass on# synthetic inputs. Without the following line, the runtime is# as much as 1.5x slower in a full DP configuration.with torch.no_grad():# 通过调用stage对应的forward函数,构建出输出output_tensors = stage(*tuple(input_tensors))if not type(output_tensors) is tuple:output_tensors = [output_tensors]# 遍历每层的输出,设置其类型和形状    for output, output_tensor in zip(outputs,list(output_tensors)):# output 是 ['out2', 'out1']training_tensor_shapes[output] = list(output_tensor.size())dtypes[output] = output_tensor.dtype# 构建输出值张量类型           eval_tensor_shapes = {}for key in training_tensor_shapes:eval_tensor_shapes[key] = tuple(training_tensor_shapes[key])training_tensor_shapes[key] = tuple(training_tensor_shapes[key])# 加载配置文件configuration_maps = {'module_to_stage_map': None,'stage_to_rank_map': None,'stage_to_depth_map': None}if args.config_path is not None:json_config_file = json.load(open(args.config_path, 'r'))configuration_maps['module_to_stage_map'] = json_config_file.get("module_to_stage_map", None)configuration_maps['stage_to_rank_map'] = json_config_file.get("stage_to_rank_map", None)configuration_maps['stage_to_rank_map'] = {int(k): v for (k, v) in configuration_maps['stage_to_rank_map'].items()}configuration_maps['stage_to_depth_map'] = json_config_file.get("stage_to_depth_map", None)# 构建一个 StageRuntimer = runtime.StageRuntime(model=model, distributed_backend=args.distributed_backend,fp16=args.fp16, loss_scale=args.loss_scale,training_tensor_shapes=training_tensor_shapes,eval_tensor_shapes=eval_tensor_shapes,training_tensor_dtypes=dtypes,inputs_module_destinations=inputs_module_destinations,target_tensor_names=target_tensor_names,configuration_maps=configuration_maps,master_addr=args.master_addr,rank=args.rank, local_rank=args.local_rank,num_ranks_in_server=args.num_ranks_in_server,verbose_freq=args.verbose_frequency,model_type=runtime.TRANSLATION,enable_recompute=args.recompute)# stage needed to determine if current stage is the first stage# num_stages needed to determine if current stage is the last stage# num_ranks needed to determine number of warmup_minibatches in case of pipeliningargs.stage = r.stageargs.num_stages = r.num_stagesargs.num_ranks = r.num_ranksif not is_first_stage():args.synthetic_data = True# define optimizerif args.no_input_pipelining:num_versions = 1else:# number of versions is the total number of machines following the current# stage, shared amongst all replicas in this stagenum_versions = r.num_warmup_minibatches + 1# if specified, resume from checkpointif args.resume:checkpoint_file_path = "%s.%d.pth.tar" % (args.resume, r.stage)assert os.path.isfile(checkpoint_file_path)print("=> loading checkpoint '{}'".format(checkpoint_file_path))checkpoint = torch.load(checkpoint_file_path)args.start_epoch = checkpoint['epoch']best_prec1 = checkpoint['best_prec1']r.load_state_dict(checkpoint['state_dict'])print("=> loaded checkpoint '{}' (epoch {})".format(checkpoint_file_path, checkpoint['epoch']))# TODO: make this configurable by args# 建立 optimizer,使用了AdamWithWeightStashing 或者 SGDWithWeightStashinguse_adam_optimizer = Trueif use_adam_optimizer:optimizer = adam.AdamWithWeightStashing(modules=r.modules(), master_parameters=r.master_parameters,model_parameters=r.model_parameters, loss_scale=args.loss_scale,num_versions=num_versions, lr=args.lr, betas=(0.9,0.999),weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency,macrobatch=args.macrobatch)else:optimizer = sgd.SGDWithWeightStashing(modules=r.modules(), master_parameters=r.master_parameters,model_parameters=r.model_parameters, loss_scale=args.loss_scale,num_versions=num_versions, lr=args.lr, momentum=args.momentum,weight_decay=args.weight_decay, verbose_freq=args.verbose_frequency)if args.resume:optimizer.load_state_dict(checkpoint['optimizer'])cudnn.benchmark = True# 加载 datasettrain_dataset = LazyParallelDataset(src_fname=os.path.join(args.data_dir, config.SRC_TRAIN_FNAME),tgt_fname=os.path.join(args.data_dir, config.TGT_TRAIN_FNAME),tokenizer=tokenizer,min_len=args.min_length_train,max_len=args.max_length_train,sort=False,max_size=None)val_dataset = ParallelDataset(src_fname=os.path.join(args.data_dir, config.SRC_VAL_FNAME),tgt_fname=os.path.join(args.data_dir, config.TGT_VAL_FNAME),tokenizer=tokenizer,min_len=args.min_length_train,max_len=args.max_length_train,sort=True)distributed_sampler = Falseif configuration_maps['stage_to_rank_map'] is not None:num_ranks_in_first_stage = len(configuration_maps['stage_to_rank_map'][0])if num_ranks_in_first_stage > 1:distributed_sampler = True# TODO: fix random seedstrain_loader = train_dataset.get_loader(batch_size=args.batch_size, seeds=range(args.epochs),batch_first=False, shuffle=True,bucketing=not args.no_bucketing, num_workers=args.workers,world_size=r.num_ranks_in_first_stage,rank=r.rank_in_stage if r.stage == 0 else 0)val_loader = val_dataset.get_loader(batch_size=args.batch_size, batch_first=False,shuffle=True, num_workers=args.workers,world_size=r.num_ranks_in_first_stage,seeds=range(args.epochs),rank=r.rank_in_stage if r.stage == 0 else 0)# if checkpoint is loaded, start by running validationif args.resume:assert args.start_epoch > 0validate(val_loader, r, args.start_epoch-1)# 进行训练,保存checkpointfor epoch in range(args.start_epoch, args.epochs):if distributed_sampler:train_loader.sampler.set_epoch(epoch)adjust_learning_rate(optimizer, epoch, args.epochs, r, args.lr_policy)# train or run forward pass only for one epochif args.forward_only:validate(val_loader, r, epoch)else:train(train_loader, r, optimizer, epoch)# evaluate on validation setprec1 = validate(val_loader, r, epoch)if r.stage != r.num_stages: prec1 = 0# remember best prec@1 and save checkpointbest_prec1 = max(prec1, best_prec1)should_save_checkpoint = args.checkpoint_dir_not_nfs or r.rank_in_stage == 0if args.checkpoint_dir and should_save_checkpoint:save_checkpoint({'epoch': epoch + 1,'arch': args.arch,'state_dict': r.state_dict(),'best_prec1': best_prec1,'optimizer' : optimizer.state_dict(),'tokenizer': tokenizer.get_state()}, args.checkpoint_dir, r.stage, epoch)

3.2 训练函数

我们下面看看训练函数 train 代码

  • 首先进入启动热身阶段,需要一直执行到 输出完成第一个小批次的前向传播,对应上图的 Startup State。
  • 然后开始交替执行后续小批次的前向传播和后向传播,从此时开始,进入到上图的 Steady State,在每个阶段之中,对于每一个小批次:
    • 实施前向传播,目的是把minibatch推送到下游worker。这就是 1F
    • 如果是最后阶段,则更新损失。
    • 梯度清零。
    • 加载保存的权重。
    • 后向传播。这就是 1B
    • 恢复最新权重。目前在本step内,就完成了 1F1B。
    • 进行下一次step。
  • 最后是剩余的后向传播,对应着热身阶段的前向传播。
def train(train_loader, r, optimizer, epoch):batch_time = AverageMeter()losses = AverageMeter()top1 = AverageMeter()top5 = AverageMeter()# switch to train moden = r.num_iterations(loader_size=len(train_loader))if args.num_minibatches is not None:n = min(n, args.num_minibatches)r.train(n)if not is_first_stage(): train_loader = Noner.set_loader(train_loader)end = time.time()epoch_start_time = time.time()if args.no_input_pipelining:num_warmup_minibatches = 0else:num_warmup_minibatches = r.num_warmup_minibatches# start num_warmup_minibatches forward passes# 启动热身阶段,需要一直执行到 输出完成第一个小批次的前向传播,对应上图的Start State。for i in range(num_warmup_minibatches):r.run_forward() # 前向传播,就是1F# 开始交替执行后续小批次的前向传播和后向传播,从此时开始,进入到上图的 Steady State。for i in range(n - num_warmup_minibatches):# perform forward passr.run_forward() #前向传播,就是1Fif is_last_stage(): # 最后阶段# measure accuracy and record lossoutput, target, loss, num_tokens = r.output, r.target, r.loss.item(), r.num_tokens()losses.update(loss, num_tokens) # 更新损失# measure elapsed timebatch_time.update(time.time() - end)end = time.time()epoch_time = (end - epoch_start_time) / 3600.0full_epoch_time = (epoch_time / float(i+1)) * float(n)else:# print log,省略# perform backward passif args.fp16:r.zero_grad() # 梯度清零else:optimizer.zero_grad() # 梯度清零optimizer.load_old_params() # 加载 stash weightr.run_backward() # 后向传播,就是1Boptimizer.load_new_params() # 恢复新的weightoptimizer.step() # 下一次训练,同时更新参数# finish remaining backward passes# 最后剩余的后向传播,对应着热身阶段的前向传播for i in range(num_warmup_minibatches):optimizer.zero_grad()optimizer.load_old_params() # 加载 stash weightr.run_backward() # 后向传播,就是1Boptimizer.load_new_params() # 恢复新的weightoptimizer.step() # 下一次训练# wait for all helper threads to completer.wait()

上面参数的 r 是 StageRuntime 类型,所以我们看看其中的run_forward和run_backward。

3.3 前向传播

以下是 StageRuntime 类的 run_forward 方法 和 _run_forward 方法,这两个方法完成了前向传播。

   def run_forward(self, recompute_step=False):"""Run forward pass."""# Receive tensors from previous worker.self.receive_tensors_forward() # 接受上一阶段的张量tensors = self.tensors[-1]# Run forward pass.self._run_forward(tensors) # 进行本阶段前向传播计算# Send tensors forward.self.send_tensors_forward() # 发送给下一阶段self.forward_stats.reset_stats()self.forward_minibatch_id += 1def _run_forward(self, tensors):# Perform forward pass through model (self.modules_with_dependencies already# has modules in topological order).# 得到module和对应的输入,输出modules = self.modules_with_dependencies.modules()all_input_names = self.modules_with_dependencies.all_input_names()all_output_names = self.modules_with_dependencies.all_output_names()# 遍历模块for i, (module, input_names, output_names) in \enumerate(zip(modules, all_input_names, all_output_names)):if i == (len(modules) - 1) and self.is_criterion: # 如果是计算损失# If layer is criterion (loss).if self.model_type == SPEECH_TO_TEXT:output = tensors["output"].transpose(0, 1).float()output_sizes = tensors["output_sizes"].cpu()target = tensors["target"].cpu()target_sizes = tensors["target_length"].cpu()input0_size = tensors["input0_size"].cpu()module_outputs = [module(output, target, output_sizes, target_sizes) / input0_size[0]]else:module_outputs = [module(tensors[input_name],tensors["target"])for input_name in input_names]module_outputs = [sum(module_outputs)]else:# 中间层# If layer is non-criterion.module_outputs = module(*[tensors[input_name]for input_name in input_names])if not isinstance(module_outputs, tuple):module_outputs = (module_outputs,)module_outputs = list(module_outputs)# 把计算结果放入tensors之中,这样后续就知道如何发送    for (output_name, module_output) in zip(output_names, module_outputs):tensors[output_name] = module_outputself.output = tensors[input_names[0]]# 如果是最后阶段,则做处理if self.is_criterion and self.model_type == TRANSLATION:loss_per_batch = tensors[output_names[0]] * tensors[self.criterion_input_name].size(1)loss_per_token = loss_per_batch / tensors["target_length"][0].item()self.loss = loss_per_tokenelif self.is_criterion:self.loss = tensors[output_names[0]]else:self.loss = 1

3.4 反向传播

运行引擎的 run_backward 完成了后向计算。

    def run_backward(self):# Receive input gradients needed for backward pass.self.receive_tensors_backward() # 从反向计算图上一层接受梯度# Backward pass through modules in reverse order.inputs = {}outputs = {}input_gradients = {}output_gradients = {}# Get input and output names spanning all modules in this stage.all_input_names_set = set()all_output_names_set = set()# 得到module和对应的输入,输出modules = self.modules_with_dependencies.modules()all_input_names = self.modules_with_dependencies.all_input_names()all_output_names = self.modules_with_dependencies.all_output_names()for (input_names, output_names) in zip(all_input_names, all_output_names):for input_name in input_names:all_input_names_set.add(input_name)for output_name in output_names:all_output_names_set.add(output_name)tensors = self.tensors.pop(0)# Set inputs, outputs, and output_gradients.# Only set outputs/output_gradients for tensors that are not inputs of# other modules in this stage.# Similarly, only set inputs for tensors that are not outputs of other# modules in this stage.for (module, input_names, output_names) in \zip(reversed(modules), reversed(all_input_names), reversed(all_output_names)):for output_name in output_names:if output_name not in all_input_names_set:if output_name not in self.gradients:output_gradients[output_name] = Noneelse: # 计算梯度记录在这里output_gradients[output_name] = self.gradients[output_name]if tensors[output_name].requires_grad:outputs[output_name] = tensors[output_name]for input_name in input_names:if input_name not in all_output_names_set:inputs[input_name] = tensors[input_name]# Hook to record input gradients.def hook_wrapper(input_name):def hook(input_gradient):input_gradients[input_name] = input_gradientreturn hookfor input_name in inputs:if input_name != "input0" and input_name != "input1" and input_name != "input2" \and inputs[input_name].requires_grad:inputs[input_name].register_hook(hook_wrapper(input_name))if "loss" in outputs:outputs["loss"] *= self.loss_scale# Perform backward pass.# 进行反向传播,output_gradients # outputs 就是要计算梯度的张量,output_gradients就是计算出来的梯度torch.autograd.backward(tuple([outputs[output_name] for output_name in outputs]),grad_tensors=tuple([output_gradients[output_name]for output_name in outputs]))# Input tensors don't need gradients.for input_name in inputs:if not inputs[input_name].requires_grad:self.gradients[input_name] = inputs[input_name]continueif input_name != "input0" and input_name != "input1" and input_name != "input2" and input_name != "input":self.gradients[input_name] = input_gradients[input_name]# Send output gradients.self.send_tensors_backward() # 发送梯度(self.gradients)给反向图的下一层if self.verbose_freq > 0 and self.backward_minibatch_id % self.verbose_freq == 0:self.backward_stats.print_stats()self.backward_stats.reset_stats()self.backward_minibatch_id += 1

我们借助前文的图,再加深一下印象。

发送逻辑:

 StageRuntime            CommunicationHandler              send_helper_thread+                           +                                 +|                           |                                 || 1                         |                                 |v                           |                                 |run_backward                     |                                 ||                           |                                 || 2                         |                                 ||                           |                    wait on backward_send_queuesv                  3        v                                 |
send_tensors_backward +--------> send                               ||                                 ||                                 ||  4                              |v               5                 vbackward_send_queues.add(tensor) +----> tensor = queue.remove()notify              ||| 6v_send|| 7|vdist.send

接受逻辑:

    StageRuntime             CommunicationHandler           recv_helper_thread+                            +                            +|                            |                            || 1                          |                            ||                            |                            | 4v                            |                            vrun_backward                       |                         _recv|                            |                            ||                            |                            ||                            |                            | 5|                            |                            || 2                          |                            v|                            |                  dist.recv / dist.broadcast|                            |                            |v                  3         v                            |
receive_tensors_backward +--------->  recv                          |+                            |                            ||                            |                            ||                            |                            ||                            |                            ||                            v                            ||                 backward_receive_queues.remove()        ||                            |                            ||                            |                            ||                            |                            ||                            |                            ||               wait on backward_receive_queues           ||                            |                            ||                            |                            ||                            |                            ||                            |                 6          v|                  backward_receive_queues <-------+ queue.add(tensor)|                            |               notify|                            |  7v                  3 return  |
gradients[output_name] <---------------+

3.5 Weight Stashing

Weight Stashing 是由OptimizerWithWeightStashing实现的。

下面省略了很多次要代码,训练时候调用了 load_old_params 和 load_new_params。

class OptimizerWithWeightStashing(torch.optim.Optimizer):"""Wrapper class that adds weight stashing to a vanilla torch.optim.Optimizer.Arguments:- optim_name: the name of optimizer, required to create the correspondingbase_optimizer (torch.optim.{optim_name}).- optimizer_args: the keyword arguments passed to base_optimizer."""def __init__(self, optim_name, modules, master_parameters, model_parameters,loss_scale, num_versions, verbose_freq=0, macrobatch=False,**optimizer_args):self.modules = modulesself.master_parameters = master_parametersself.model_parameters = model_parameters  # model_parameters is None if not fp16.self.loss_scale = loss_scale# Only need at most 2 versions if using macrobatching.if macrobatch:num_versions = min(2, num_versions) self.num_versions = num_versionsself.base_optimizer = getattr(torch.optim, optim_name)(master_parameters, **optimizer_args)self.latest_version = Version()self.current_version = Version()self.initialize_queue()self.verbose_freq = verbose_freqself.batch_counter = 0# If macrobatching, push and pop versions at the right rate.if macrobatch:self.update_interval = self.num_versionselse:self.update_interval = 1def initialize_queue(self):self.queue = deque(maxlen=self.num_versions)for i in range(self.num_versions):self.queue.append(self.get_params(clone=True))self.buffered_state_dicts = self.queue[0][0] # stash weght变量def load_old_params(self):if self.num_versions > 1:self.set_params(*self.queue[0]) #找到最初的旧weightdef load_new_params(self):if self.num_versions > 1:self.set_params(*self.queue[-1]) # 加载最新的weightdef zero_grad(self): # 用来resetif self.batch_counter % self.update_interval == 0:self.base_optimizer.zero_grad()def step(self, closure=None):"""Performs a single optimization step.Arguments:closure (callable, optional): A closure that reevaluates the modeland returns the loss."""# 每 update_interval个 steps更新一次梯度if self.batch_counter % self.update_interval != self.update_interval - 1:self.batch_counter += 1return None# 省略代码self.latest_version = self.latest_version.incr() # 因为多训练了一步,所以增加版本号if self.num_versions > 1:self.buffered_state_dicts = self.queue[0][0] self.queue.append(self.get_params(clone=False)) # 把新的变量存进去self.batch_counter += 1return loss

0xEE 个人信息

★★★★★★关于生活和技术的思考★★★★★★

微信公众账号:罗西的思考

如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

0xFF 参考

lingvo框架走读笔记

Tensorflow实现先累加多个minibatch计算的梯度,再反向传播

用tensorflow2实现梯度累积

十倍模型计算时间仅增20%:OpenAI开源梯度替换插件

PipeDream: Fast and Efficient Pipeline Parallel DNN Training

论文解读系列第五篇:微软斯坦福等PipeDream快速训练大规模神经网络

https://cs231n.github.io/neural-networks-3/#gradcheck

https://www.cnblogs.com/geekfx/p/14182048.html

训练时显存优化技术——OP合并与gradient checkpoint

Pytorch笔记04-自定义torch.autograd.Function

PyTorch教程之Autograd

pytorch的自定义拓展之(三)——torch.autograd.Function的简单定义与案例

pytorch的自定义拓展之(二)——torch.autograd.Function完成自定义层

PyTorch 源码解读之 torch.autograd:梯度计算详解

再谈反向传播(Back Propagation)

CS231n课程笔记翻译:反向传播笔记

Pytorch 分布式训练

torch.distributed

GPT-3模型为何难以复现?这也许是分布式AI框架的最优设计

苏起冬 - pipedream

[源码解析] 深度学习流水线并行 PipeDream(6)--- 1F1B策略相关推荐

  1. [源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型

    [源码解析] 深度学习流水线并行 PipeDream(3)- 转换模型 文章目录 [源码解析] 深度学习流水线并行 PipeDream(3)--- 转换模型 0x00 摘要 0x01 前言 1.1 改 ...

  2. [源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积

    [源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积 文章目录 [源码解析] 深度学习流水线并行GPipe (2) ----- 梯度累积 0x00 摘要 0x01 概述 1.1 前 ...

  3. [源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现

    [源码解析] 深度学习流水线并行Gpipe(1)-流水线基本实现 文章目录 [源码解析] 深度学习流水线并行Gpipe(1)---流水线基本实现 0x00 摘要 0x01 概述 1.1 什么是GPip ...

  4. [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (11) - on spark - GLOO 方案 文章目录 [源码解析] 深度学习分布式训练框架 horovod (11) --- on spa ...

  5. [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

    [源码解析] 深度学习分布式训练框架 horovod (10) - run on spark 文章目录 [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark ...

  6. 【 线性模型 Linear-Model 数学原理分析以及源码实现 深度学习 Pytorch笔记 B站刘二大人(1/10)】

    线性模型 Linear-Model 数学原理分析以及源码实现 深度学习 Pytorch笔记 B站刘二大人(1/10) 数学原理分析 线性模型是我们在初级数学问题中所遇到的最普遍也是最多的一类问题 在线 ...

  7. 【 非线性回归 Logistics-Regression 模块实现与源码解读 深度学习 Pytorch笔记 B站刘二大人(5/10)】

    非线性回归 Logistics-Regression 模块实现与源码解读 深度学习 Pytorch笔记 B站刘二大人(5/10) 数学推导 什么是logistics函数 在定义上Logistic函数或 ...

  8. vue+django 微博舆情系统源码、深度学习+舆情扩散消失分析、舆情紧急等级、属地分析、按话题、情感预测、话题评论获取、提取观点、正面负面舆情、按区域检测舆情

    项目背景 315又马上要到了,现在有开始对食品安全话题的关注地提升了,因此,本文系统对微博的食品安全话题进行分析,有如下的功能 1.展示当前食品安全事件相关的热点信息以及提供根据食品关键词,食品安全类 ...

  9. 生成对抗网络入门详解及TensorFlow源码实现--深度学习笔记

    生成对抗网络入门详解及TensorFlow源码实现–深度学习笔记 一.生成对抗网络(GANs) 生成对抗网络是一种生成模型(Generative Model),其背后最基本的思想就是从训练库里获取很多 ...

最新文章

  1. 密码设置Android设备管理
  2. 这个「化学家」登上Nature封面:工作007,8天完成近700次实验,还设计出新催化剂...
  3. (-215:Assertion failed) dst.data == (uchar*)dst_ptr in function 'cvShowImage'
  4. 手把手玩转win8开发系列课程(2)
  5. Android之6.0上的重要变化(二)
  6. c语言self用法,C/C++知识点之Self Numbers C语言 UVA640
  7. jQuery修改alert ,confirm的样式
  8. Linux设备开机卡主
  9. as3 primitives
  10. BZOJ1185[HNOI2007] 最小矩形覆盖
  11. 怎么更改计算机上的限制应用,图文详解通过修改win10系统组策略实现限制指定应用程序的运行-系统操作与应用 -亦是美网络...
  12. python caffe框架_Caffe(卷积神经网络框架)配置-Windows篇
  13. 用mapgis数据转成arcgis中shape格式的方法
  14. 《程序员思维训练》读书小记
  15. 在项目中发现哪些经典bug?什么原因导致的?
  16. 内存压力测试工具Memtester
  17. android 日历动态图标,android 日历图标显示星期
  18. 功率谱密度的相关推导以及Python实现
  19. 安卓系统加速_谷歌与安卓合作开发预警系统,安卓手机将成“迷你地震仪”
  20. 启蒙教师周志华亲自讲解,机器学习视频课上线了

热门文章

  1. 从字符串中提取IP子串(C语言)
  2. 盘点2017企业服务领域最受关注的100家厂商(BPM平台篇)
  3. 考取PMP认证有用吗?
  4. 百度Echarts设置markPoint展示样式
  5. 西门子bop20显示电流_SIEMENS/西门子BOP20基本操作员面板使用方法说明
  6. 时间计算题100道_2019消防工程师考试易考6大类型计算题
  7. numpy.arctan, math.atan, math.atan2的区别
  8. HbuilderX导入项目运行到微信小程序代码杂乱问题
  9. 计算机教室报损登记簿,瑞安市第四中学
  10. 瑞幸咖啡第四季营收24亿:同比增80.7% 门店总数超6000家