[源码解析] 模型并行分布式训练 Megatron (4) — 如何设置各种并行

文章目录

  • [源码解析] 模型并行分布式训练 Megatron (4) --- 如何设置各种并行
    • 0x00 摘要
    • 0x01 前文回顾
    • 0x02 初始化
      • 2.1 全局变量
      • 2.2 初始化代码
    • 0x03 切分样例
      • 3.1 注释
      • 3.2 切分情况
      • 3.3 切分策略
      • 3.4 实验
    • 0x04 起始状态
      • 4.1 GPU 状况
      • 4.2 符号说明
      • 4.3 初始分组
    • 0x05 Tensor model-parallel
      • 5.1 分组
      • 5.2 使用
    • 0x06 Pipe-parallel
      • 6.1 分组
      • 6.2 使用
        • 6.2.1 上下游rank
        • 6.2.2 world size
    • 0x07 Data-parallel
      • 7.1 分组
      • 7.2 如何使用
    • 0x08 模型组
    • 0x09 如何把模型分到GPU
    • 0xEE 个人信息
    • 0xFF 参考

0x00 摘要

NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。

本系列大概有 5 篇文章,通过论文和源码和大家一起学习研究。本文将看看 Megatron 如何处理设置并行。

本系列其他文章为:

[源码解析] 模型并行分布式训练Megatron (1) — 论文 & 基础

[源码解析] 模型并行分布式训练Megatron (2) — 整体架构

[源码解析] 模型并行分布式训练 Megatron (3) —模型并行实现

另外,本文深度借鉴了 [细读经典]Megatron论文和代码详细分析(2),强烈推荐有 Transformer 困惑的朋友深入阅读作者专栏。

0x01 前文回顾

前文我们对模型并行的原理和代码进行了分析,对于给定的模型,现在还需要解决几个问题:

  • 如何把模型切分给节点,比如哪个节点负责哪些层。
  • 数据并行,模型并行,流水线并行这几种并行之中,每个节点分别属于哪个部分?
  • 如何避免流水线带来的问题。

我们接下来就仔细分析一下。

0x02 初始化

initialize_model_parallel 方法用来设置模型并行,所以我们接下来就具体分析。

2.1 全局变量

因为前文_initialize_distributed之中调用了torch.distributed.init_process_group 初始化分布式环境,所以我们知道,每个进程都有自己的 gloabl rank 和 local rank,都有自己的全局变量。

主要变量如下(具体例子可以结合 initialize_model_parallel 之中的注释来看):

  • _TENSOR_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是tensor 并行进程组。

    • 假如每一层分为两个tensor,则 _TENSOR_MODEL_PARALLEL_GROUP 例子为:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。
  • _PIPELINE_MODEL_PARALLEL_GROUP :当前 rank 所属于的Intra-layer model parallel group,就是流水线进程组。
    • 假如流水线深度为4,则例子为 [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。
  • _MODEL_PARALLEL_GROUP :当前 rank 所属于的模型并行进程组,包括了以上两组。
    • 针对我们例子,就是完整模型被复制了两份,其 GPU 节点具体是[0, 1, 4, 5, 8, 9, 12, 13],[2, 3, 6, 7, 10, 11, 14, 15]
  • _EMBEDDING_GROUP : 嵌入对应的进程组。
  • _DATA_PARALLEL_GROUP :当前 rank 所属于的Data parallel group。
    • 假如数据并行度数为2,则例子为[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。

具体如下:

# Intra-layer model parallel group that the current rank belongs to.
_TENSOR_MODEL_PARALLEL_GROUP = None
# Inter-layer model parallel group that the current rank belongs to.
_PIPELINE_MODEL_PARALLEL_GROUP = None
# Model parallel group (both intra- and pipeline) that the current rank belongs to.
_MODEL_PARALLEL_GROUP = None
# Embedding group.
_EMBEDDING_GROUP = None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP = None_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = None# These values enable us to change the mpu sizes on the fly.
_MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_TENSOR_MODEL_PARALLEL_RANK = None
_MPU_PIPELINE_MODEL_PARALLEL_RANK = None# A list of ranks that have a copy of the embedding.
_EMBEDDING_GLOBAL_RANKS = None# A list of global ranks for each pipeline group to ease calculation of the source
# rank when broadcasting from the first or last pipeline stage.
_PIPELINE_GLOBAL_RANKS = None

2.2 初始化代码

我们首先把 initialize_model_parallel 代码摘录出来。initialize_model_parallel 作用就是对模型进行分组,然后初始化进程组相关的各种全局变量。

def initialize_model_parallel(tensor_model_parallel_size_=1,pipeline_model_parallel_size_=1,virtual_pipeline_model_parallel_size_=None,pipeline_model_parallel_split_rank_=None):"""Initialize model data parallel groups.Arguments:tensor_model_parallel_size: number of GPUs used for tensor model parallelism.pipeline_model_parallel_size: number of GPUs used for pipeline model parallelism.virtual_pipeline_model_parallel_size: number of virtual stages (interleavedpipeline).pipeline_model_parallel_split_rank: for models with both encoder and decoder,rank in pipeline with split point.Let's say we have a total of 16 GPUs denoted by g0 ... g15 and weuse 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelizethe model pipeline. The present function willcreate 8 tensor model-parallel groups, 4 pipeline model-parallel groupsand 8 data-parallel groups as:8 data_parallel groups:[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]8 tensor model-parallel groups:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]4 pipeline model-parallel groups:[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]Note that for efficiency, the caller should make sure adjacent ranksare on the same DGX box. For example if we are using 2 DGX-1 boxeswith a total of 16 GPUs, rank 0 to 7 belong to the first box andranks 8 to 15 belong to the second box."""if torch.distributed.get_rank() == 0:print('> initializing tensor model parallel with size {}'.format(tensor_model_parallel_size_))print('> initializing pipeline model parallel with size {}'.format(pipeline_model_parallel_size_))# Get world size and rank. Ensure some consistencies.world_size = torch.distributed.get_world_size()tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)ensure_divisibility(world_size,tensor_model_parallel_size * pipeline_model_parallel_size)data_parallel_size = world_size // (tensor_model_parallel_size *pipeline_model_parallel_size)num_tensor_model_parallel_groups = world_size // tensor_model_parallel_sizenum_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_sizenum_data_parallel_groups = world_size // data_parallel_sizeif virtual_pipeline_model_parallel_size_ is not None:global _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANKglobal _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_if pipeline_model_parallel_split_rank_ is not None:global _PIPELINE_MODEL_PARALLEL_SPLIT_RANK_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = pipeline_model_parallel_split_rank_rank = torch.distributed.get_rank()# Build the data-parallel groups.global _DATA_PARALLEL_GROUPall_data_parallel_group_ranks = []for i in range(pipeline_model_parallel_size):start_rank = i * num_pipeline_model_parallel_groupsend_rank = (i + 1) * num_pipeline_model_parallel_groupsfor j in range(tensor_model_parallel_size):ranks = range(start_rank + j, end_rank,tensor_model_parallel_size)all_data_parallel_group_ranks.append(list(ranks))group = torch.distributed.new_group(ranks)if rank in ranks:_DATA_PARALLEL_GROUP = group# Build the model-parallel groups.global _MODEL_PARALLEL_GROUPfor i in range(data_parallel_size):ranks = [data_parallel_group_ranks[i]for data_parallel_group_ranks in all_data_parallel_group_ranks]group = torch.distributed.new_group(ranks)if rank in ranks:_MODEL_PARALLEL_GROUP = group# Build the tensor model-parallel groups.global _TENSOR_MODEL_PARALLEL_GROUPfor i in range(num_tensor_model_parallel_groups):ranks = range(i * tensor_model_parallel_size,(i + 1) * tensor_model_parallel_size)group = torch.distributed.new_group(ranks)if rank in ranks:_TENSOR_MODEL_PARALLEL_GROUP = group# Build the pipeline model-parallel groups and embedding groups# (first and last rank in each pipeline model-parallel group).global _PIPELINE_MODEL_PARALLEL_GROUPglobal _PIPELINE_GLOBAL_RANKSglobal _EMBEDDING_GROUPglobal _EMBEDDING_GLOBAL_RANKSfor i in range(num_pipeline_model_parallel_groups):ranks = range(i, world_size,num_pipeline_model_parallel_groups)group = torch.distributed.new_group(ranks)if rank in ranks:_PIPELINE_MODEL_PARALLEL_GROUP = group_PIPELINE_GLOBAL_RANKS = ranks# Setup embedding group (to exchange gradients between# first and last stages).if len(ranks) > 1:embedding_ranks = [ranks[0], ranks[-1]]if pipeline_model_parallel_split_rank_ is not None and \pipeline_model_parallel_split_rank_ not in embedding_ranks:embedding_ranks = [ranks[0],ranks[pipeline_model_parallel_split_rank_],ranks[-1]]else:embedding_ranks = ranksgroup = torch.distributed.new_group(embedding_ranks)if rank in embedding_ranks:_EMBEDDING_GROUP = groupif rank in ranks:_EMBEDDING_GLOBAL_RANKS = embedding_ranks

0x03 切分样例

我们使用注释内容来进行学习如何切分模型,如何把多种并行模式组合在一起。

3.1 注释

initialize_model_parallel 的注释值得我们深入学习,具体如下:

Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:8 data_parallel groups:[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]8 tensor model-parallel groups:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]4 pipeline model-parallel groups:[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.

从注释可以知道如下信息:

  • 假定目前有16个GPU,属于两个node,rank 0 ~7 属于第一个节点,rank 8 ~ 15 属于第二个节点。

  • create 8 tensor model-parallel groups, 4 pipeline model-parallel groups,这说明将一个完整模型切分如下:

    • 沿着行横向切了一刀:tensor_model_parallel_size = 16 / 8 = 2,就是2个 GPUs 来进行模型张量并行。
    • 沿着列纵向切了三刀:pipeline_model_parallel_size = 16 /4 = 4,就是4个GPUs 进行流水线并行。
    • 因此,一个模型分为8块,每一块放在一个GPU之上,就是8个GPU。而通过如下计算可以知 16 GPUs / 8 GPUs = 2 models。即,16张卡可以放置两个完整模型。
  • 因为张量模型并行组大小是2,即16个GPU被分成8组,则这8组内容是 [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]。

  • 因为流水线并行组大小是4,即16个GPU被分成4组,则这4组内容是[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]。

  • 因为数据并行组大小是2,16个GPU被分成8组,则这8组内容是[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]。

  • 以上这些进程组都是通过 torch.distributed.new_group 来完成,这样组内进程之间就知道哪些进程是在同一个组内,是在一起训练的,也知道怎么通信。

3.2 切分情况

模型原始图如下

模型切分之后如下,一共被分成8块。其中,第一层被切分为 A,B,所以 A,B 之间就是 Tensor Model parallel。后面 C,D 之间也是 Tensor Model parallel,把两层都做了切分,依次类推。

我们的目标就是用代码来看看如何生成注释里面的各种模型组

3.3 切分策略

我们接下来看看具体切分的策略,也就是GPU分配策略。切分需要综合考虑多种情况,首先看看模型并行的通信状况。

  • 张量并行:通信发生在每层的前向传播和后向传播过程之中,通信类型是all-reduce,不但单次通信数据量大,并且通信频繁。
  • 流水线并行:通信在流水线阶段相邻的切分点之上,通信类型是P2P通信,单词通信数据量较少但是比较频繁,而且因为流水线的特点,会产生GPU空闲时间,这里称为流水线气泡(Bubble)。

我们接下来看看各种并行机制的对比。

  • Tensor versus Pipeline Parallelism. 张量模型的并行性在节点内是最好的,因为它会减少通信量。另一方面,流水线模型并行使用更便宜的点对点通信,可以跨节点执行,而不会限制整个计算。然而,流水线并行性会在流水线气泡中花费大量时间,因此,应限制流水线级的总数,以便流水线中的microbatches数量是流水线深度的合理倍数。当张量并行大小等于单个节点中的GPU数量时会达到峰值性能。
  • Pipeline versus Data Parallelism. 对于每个batch size,吞吐量随着流水线并行规模的增加而降低。流水线模型并行应该主要用于支持不适合单个 worker 的大型模型训练。而数据并行应该用于扩大训练规模。
  • Tensor versus Data Parallelism. 接下来看看数据和张量模型的并行性对性能的影响。在较大的批处理量和微批处理量为1的情况下,数据并行通信并不频繁;张量模型并行需要对批处理中的每个微批进行all-to-all通信。这种all-to-all的通信主导了端到端的训练时间,特别是当通信需要在多GPU节点上进行时。此外,随着张量模型并行规模的增加,我们在每个GPU上执行较小的矩阵乘法(因为会把模型张量进行切分),这降低了每个GPU的利用率。

最后看看结论

  • Tensor模型并行被用于intra-node transformer 层,因为张量并行计算密集且是耗费大量带宽,这样会在HGX based系统上高效运行。
  • Pipeline 模型并行主要被用于inter-node transformer 层,因为Pipeline 并行的通信带宽占用少,其可以有效利用集群中多网卡设计。
  • 数据并行则在前两者基础之上进行加持,使得训练可以扩展到更大规模和更快的速度。我们应该注意到,尽管数据并行可以带来高效的扩展,但我们不能单独使用数据并行来处理训练超大模型,因为 a)内存容量不足,b)数据并行的扩展限制。

3.4 实验

我们接下来做一个实验看看。

import torchworld_size = 16
tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensor
pipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipeline
data_parallel_size = world_size // (tensor_model_parallel_size *pipeline_model_parallel_size) # 2
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4
num_data_parallel_groups = world_size // data_parallel_size # 8# Build the data-parallel groups.
print("------ Build the data-parallel groups -----")
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):start_rank = i * num_pipeline_model_parallel_groupsend_rank = (i + 1) * num_pipeline_model_parallel_groupsfor j in range(tensor_model_parallel_size):ranks = range(start_rank + j, end_rank,tensor_model_parallel_size)all_data_parallel_group_ranks.append(list(ranks))
print(all_data_parallel_group_ranks)# Build the model-parallel groups.
print("------ Build the model-parallel groups -----")
for i in range(data_parallel_size):ranks = [data_parallel_group_ranks[i]for data_parallel_group_ranks in all_data_parallel_group_ranks]print(list(ranks))# Build the tensor model-parallel groups.
print("------ Build the tensor model-parallel groups -----")
for i in range(num_tensor_model_parallel_groups):ranks = range(i * tensor_model_parallel_size,(i + 1) * tensor_model_parallel_size)print(list(ranks))# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
print("------ Build the pipeline model-parallel groups -----")
for i in range(num_pipeline_model_parallel_groups):ranks = range(i, world_size,num_pipeline_model_parallel_groups)print(list(ranks))

输出如下。需要注意,这里都是 GPU 的序列号,[0,2] 就是 [g0, g2]:

------ Build the data-parallel groups -----
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
------ Build the model-parallel groups -----
[0, 1, 4, 5, 8, 9, 12, 13]
[2, 3, 6, 7, 10, 11, 14, 15]
------ Build the tensor model-parallel groups -----
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
------ Build the pipeline model-parallel groups -----
[0, 4, 8, 12]
[1, 5, 9, 13]
[2, 6, 10, 14]
[3, 7, 11, 15]我们对比一下注释,发现代码打印结果可以和注释对应上:Let's say we have a total of 16 GPUs denoted by g0 ... g15 and weuse 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelizethe model pipeline. The present function willcreate 8 tensor model-parallel groups, 4 pipeline model-parallel groupsand 8 data-parallel groups as:8 data_parallel groups:[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]8 tensor model-parallel groups:[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]4 pipeline model-parallel groups:[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]

我们接下来会进行具体分析。

0x04 起始状态

4.1 GPU 状况

从注释中可以看到:

Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box.

意思就是:调用者需要确保相邻的rank在同一个节点上,我们例子有两个Node,其中第一个Node拥有 GPU 0 ~ 7,就是 rank 0 ~ 7,第二个Node是 GPU 8~15,就是 rank 8 ~ 15。

具体如下,这里每行4个GPU,是因为 4 GPUs to parallelize the model pipeline,所以流水线每个stage是4个GPU

4.2 符号说明

下面是论文之中提到的一些符号,这里有必要再取出来温习一下: