[深度学习] 分布式模式介绍(一)

[深度学习] 分布式Tensorflow介绍(二)

[深度学习] 分布式Pytorch介绍(三)

[深度学习] 分布式Horovod介绍(四)


 一  Pytorch 分布式简介

https://pytorch.org/docs/stable/distributed.html

torch.distributed 包支持

Pytorch 中通过 torch.distributed 包提供分布式支持,包括 GPUCPU 的分布式训练支持。Pytorch 分布式目前只支持 Linux.

By default, the Gloo and NCCL backends are built and included in PyTorch distributed (NCCL only when building with CUDA). MPI is an optional backend that can only be included if you build PyTorch from source. (e.g. building PyTorch on a host that has MPI installed.)

Backends

Which backend to use?

In the past, we were often asked: “which backend should I use?”.

  • Rule of thumb

    • Use the NCCL backend for distributed GPU training

    • Use the Gloo backend for distributed CPU training.

  • GPU hosts with InfiniBand interconnect

    • Use NCCL, since it’s the only backend that currently supports InfiniBand and GPU Direct.

  • GPU hosts with Ethernet interconnect

    • Use NCCL, since it currently provides the best distributed GPU training performance, especially for multiprocess single-node or multi-node distributed training. If you encounter any problem with NCCL, use Gloo as the fallback option. (Note that Gloo currently runs slower than NCCL for GPUs.)

  • CPU hosts with InfiniBand interconnect

    • If your InfiniBand has enabled IP over IB, use Gloo, otherwise, use MPI instead. We are planning on adding InfiniBand support for Gloo in the upcoming releases.

  • CPU hosts with Ethernet interconnect

    • Use Gloo, unless you have specific reasons to use MPI.

torch.distributed 的优势如下:

1. 每个进程对应一个独立的训练过程,且只对梯度等少量数据进行信息交换。

在每次迭代中,每个进程具有自己的 optimizer ,并独立完成所有的优化步骤,进程内与一般的训练无异。

在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast 到所有进程。之后,各进程用该梯度来更新参数。由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。

而在 DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU

相较于 DataParalleltorch.distributed 传输的数据量更少,因此速度更快,效率更高。

2. 每个进程包含独立的解释器和 GIL

由于每个进程拥有独立的解释器和 GIL,消除了来自单个 Python 进程中的多个执行线程,模型副本或 GPU 的额外解释器开销和 GIL-thrashing ,因此可以减少解释器和 GIL 使用冲突。这对于严重依赖 Python runtimemodels 而言,比如说包含 RNN 层或大量小组件的 models 而言,这尤为重要。

新的库的主要亮点有:

  • 新的 torch.distributed 是性能驱动的,并且对所有后端 (Gloo,NCCL 和 MPI) 完全异步操作
  • 显着的分布式数据并行性能改进,尤其适用于网络较慢的主机,如基于以太网的主机
  • 为torch.distributed  package中的所有分布式集合操作添加异步支持
  • 在Gloo后端添加以下CPU操作:send,recv,reduce,all_gather,gather,scatter
  • 在NCCL后端添加barrier操作
  • 为NCCL后端添加new_group支持

二 分布式训练介绍

分布式训练可以分为:

  1. 单机多卡,DataParallel(最常用,最简单)
  2. 单机多卡,DistributedDataParallel(较高级)
  3. 多机多卡,DistributedDataParallel(最高级)

Pytorch分布训练一开始接触的往往是DataParallel,这个wrapper能够很方便的使用多张卡,而且将进程控制在一个。唯一的问题就在于,DataParallel只能满足一台机器上gpu的通信,而一台机器一般只能装8张卡,对于一些大任务,8张卡就很吃力了,这个时候我们就需要面对多机多卡分布式训练这个问题了

DistributedDataParallel (DDP)在模块级别实现数据并行性。 它使用 Torch.distributed 程序包中的通信集合来同步梯度,参数和缓冲区。 并行性在流程内和跨流程均可用。 在一个过程中,DDP 将输入模块复制到device_ids中指定的设备,将输入沿批次维度分散,然后将输出收集到output_device,这与 DataParallel 相似。 在整个过程中,DDP 在正向传递中插入必要的参数同步,在反向传递中插入梯度同步。 用户可以将进程映射到可用资源,只要进程不共享 GPU 设备即可。 推荐的方法(通常是最快的方法)是为每个模块副本创建一个过程,即在一个过程中不进行任何模块复制。

DataParallelDistributedDataParallel之间的比较

在深入探讨之前,让我们澄清一下为什么尽管增加了复杂性,但还是考虑使用DistributedDataParallel而不是DataParallel

  1. 如果模型太大而无法容纳在单个 GPU 上,则必须使用模型并行将其拆分到多个 GPU 中。 DistributedDataParallel与模型并行一起使用; DataParallel目前没有。
  2. DataParallel是单进程,多线程,并且只能在单台机器上运行,而DistributedDataParallel是多进程,并且适用于单机和多机训练。 因此,即使在单机训练中,数据足够小以适合单机,DistributedDataParallel仍比DataParallel快。 DistributedDataParallel还预先复制模型,而不是在每次迭代时复制模型,并避免了全局解释器锁定。
  3. 如果您的两个数据都太大而无法容纳在一台计算机和上,而您的模型又太大了以至于无法安装在单个 GPU 上,则可以将模型并行(跨多个 GPU 拆分单个模型)与DistributedDataParallel结合使用。 在这种情况下,每个DistributedDataParallel进程都可以并行使用模型,而所有进程都将并行使用数据。

1:  torch.nn.parallel.DistributedDataParallel

这个从名字上就能看出来与DataParallel相类似,也是一个模型wrapper。这个包是实现多机多卡分布训练最核心东西,它可以帮助我们在不同机器的多个模型拷贝之间平均梯度。

2: torch.utils.data.distributed.DistributedSampler

在多机多卡情况下分布式训练数据的读取也是一个问题,不同的卡读取到的数据应该是不同的。dataparallel的做法是直接将batch切分到不同的卡,这种方法对于多机来说不可取,因为多机之间直接进行数据传输会严重影响效率。于是有了利用sampler确保dataloader只会load到整个数据集的一个特定子集的做法。DistributedSampler就是做这件事的。它为每一个子进程划分出一部分数据集,以避免不同进程之间数据重复

1.0的多机多卡的计算模型并没有采用主流的Parameter Server结构,而是直接用了Uber Horovod的形式,也是百度开源的RingAllReduce算法。

采用PS计算模型的分布式,通常会遇到网络的问题,随着worker数量的增加,其加速比会迅速的恶化,例如resnet50这样的模型,目前的TF在10几台机器的时候,加速比已经开始恶化的不可接受了。因此,经常要上RDMA、InfiniBand等技术,并且还带来了一波网卡的升级,有些大厂直接上了100GBs的网卡,有钱任性。而Uber的Horovod,采用的RingAllReduce的计算方案,其特点是网络通信量不随着worker(GPU)的增加而增加,是一个恒定值。简单看下图理解下,GPU 集群被组织成一个逻辑环,每个GPU有一个左邻居、一个右邻居,每个GPU只从左邻居接受数据、并发送数据给右邻居。即每次梯度每个gpu只获得部分梯度更新,等一个完整的Ring完成,每个GPU都获得了完整的参数。

这里引入了一个新的函数model = torch.nn.parallel.DistributedDataParallel(model)为的就是支持分布式模式

不同于原来在multiprocessing中的model = torch.nn.DataParallel(model,device_ids=[0,1,2,3]).cuda()函数,这个函数只是实现了在单机上的多GPU训练,根据官方文档的说法,甚至在单机多卡的模式下,新函数表现也会优于这个旧函数。

这里要提到两个问题:

  • 每个进程都有自己的Optimizer同时每个迭代中都进行完整的优化步骤,虽然这可能看起来是多余的,但由于梯度已经聚集在一起并跨进程平均,因此对于每个进程都是相同的,这意味着不需要参数广播步骤,从而减少了在节点之间传输张量tensor所花费的时间。
  • 另外一个问题是Python解释器的,每个进程都包含一个独立的Python解释器,消除了来自单个Python进程中的多个执行线程,模型副本或GPU的额外解释器开销和“GIL-thrashing”。 这对于大量使用Python运行时的模型尤其重要。

基本使用流程

Pytorch 中分布式的基本使用流程如下:

  1. 在使用 distributed 包的任何其他函数之前,需要使用 init_process_group 初始化进程组,同时初始化 distributed 包。
  2. 如果需要进行小组内集体通信,用 new_group 创建子分组
  3. 创建分布式并行模型 DDP(model, device_ids=device_ids)
  4. 为数据集创建 Sampler
  5. 使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本,开始训练
  6. 使用 destory_process_group() 销毁进程组

train_dataset最好不要用自己写的sampler,否则还需要再实现一遍分布式的数据划分方式

单机多卡--DistributedDataParallel

前面的 DataParallel 并不是完整的分布式计算,只是将一些部分的计算(例如,前向和反向)放到了多张卡上,某些东西(例如,梯度)计算的时候仍然是「一卡有难,八方围观」,可能会将第一张卡撑爆,并且和 DDP 对比的话效率实在不高。

首先增加几个概念:

  1. world_size:总共有几个 Worker
  2. rank:这个 Worker 是全局第几个 Worker
  3. local_rank:这个 Worker 是这台机器上的第几个 Worker
  4. 每个 Worker 可以用一张或者多张卡,但习惯于一个 Worker 只用一张卡
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import os
from torch.utils.data.distributed import DistributedSampler
# 1) 初始化
torch.distributed.init_process_group(backend="nccl")input_size = 5
output_size = 2
batch_size = 30
data_size = 90# 2) 配置每个进程的gpu
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)class RandomDataset(Dataset):def __init__(self, size, length):self.len = lengthself.data = torch.randn(length, size).to('cuda')def __getitem__(self, index):return self.data[index]def __len__(self):return self.lendataset = RandomDataset(input_size, data_size)
# 3)使用DistributedSampler
rand_loader = DataLoader(dataset=dataset,batch_size=batch_size,sampler=DistributedSampler(dataset))class Model(nn.Module):def __init__(self, input_size, output_size):super(Model, self).__init__()self.fc = nn.Linear(input_size, output_size)def forward(self, input):output = self.fc(input)print("  In Model: input size", input.size(),"output size", output.size())return outputmodel = Model(input_size, output_size)# 4) 封装之前要把模型移到对应的gpu
model.to(device)if torch.cuda.device_count() > 1:print("Let's use", torch.cuda.device_count(), "GPUs!")# 5) 封装model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[local_rank],output_device=local_rank)for data in rand_loader:if torch.cuda.is_available():input_var = data.cuda()else:input_var = dataoutput = model(input_var)print("Outside: input size", input_var.size(), "output_size", output.size())

torch.distributed.launch 会给模型分配一个args.local_rank的参数,也可以通过torch.distributed.get_rank()获取进程id。

怎么将程序跑起来。这里也有两种方法:

1. 用 torch.distributed.launch

python -m torch.distributed.launch --nproc_per_node=4 main.py

2. 用 torch.multiprocessing:

import torch.multiprocessing as mpdef main(rank, your_custom_arg_1, your_custom_arg_2): # 这里第一个 rank 变量会被 mp.spawn 函数自动填充,可以充当 local_rank 来用pass # 将前面那一堆东西包装成一个 main 函数mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))

三 分布式初始化

torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')

参数说明:

  • backend(str): 后端选择,包括上面那几种 gloo,nccl,mpi
  • init_method(str,optional): 用来初始化包的URL, 我理解是一个用来做并发控制的共享方式
  • world_size(int, optional): 参与这个工作的进程数
  • rank(int,optional): 当前进程的rank
  • group_name(str,optional): 用来标记这组进程名的

To enable backend == Backend.MPI, PyTorch needs to built from source on a system that supports MPI.

The same applies to NCCL as well.

四  init_method 三种方式

1. TCP initialization

tcp:// IP组播(要求所有进程都在同一个网络中)比较好理解,   以TCP协议的方式进行不同分布式进程之间的数据交流,需要设置一个端口,不同进程之间公用这一个端口,并且设置host的级别和host的数量。设计两个参数rank和world_size。其中rank为host的编号,默认0为主机,端口应该位于该主机上。world_size为分布式主机的个数。

用该方式,运行上面的代码可以使用如下指令:

在主机01上:

python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 0 --world-size 2

在主机02上:

python mnsit.py --init-method tcp://10.172.1.2:22225 --rank 1 --world-size 2

这里没有设置backend参数,所以默认是gloo。22225是端口号,用一个没有没占用的就行。这两句指令的先后顺序没有要求,只有两条指令都输入,程序才会运行起来。

2. Shared file-system initialization

file:// 共享文件系统(要求所有进程可以访问单个文件系统)有共享文件系统可以选择

提供的第二种方式是文件共享,机器有共享的文件系统,故可以采用这种方式,也避免了基于TCP的网络传输。这里使用方式是使用绝对路径在指定一个共享文件系统下不存在的文件。

在主机01上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 0 --world-size 2

在主机02上:

python mnsit.py --init-method file://PathToShareFile/MultiNode --rank 1 --world-size 2

这里相比于TCP的方式麻烦一点的是运行完一次必须更换共享的文件名,或者删除之前的共享文件,不然第二次运行会报错。

3. Environment variable initialization

env:// 环境变量(需要您手动分配等级并知道所有进程可访问节点的地址)默认是这个

MASTER_PORT - required; has to be a free port on machine with rank 0
MASTER_ADDR - required (except for rank 0); address of rank 0 node
WORLD_SIZE - required; can be set either here, or in a call to init function
RANK - required; can be set either here, or in a call to init function

五  torch.distributed.launch

torch.distributed包提供了一个启动实用程序torch.distributed.launch,此帮助程序可用于为每个节点启动多个进程以进行分布式训练,它在每个训练节点上产生多个分布式训练进程。

这个工具可以用作CPU或者GPU,如果被用于GPU,每个GPU产生一个进程Process

该工具既可以用来做单节点多GPU训练,也可用于多节点多GPU训练。如果是单节点多GPU,将会在单个GPU上运行一个分布式进程,据称可以非常好地改进单节点训练性能。如果用于多节点分布式训练,则通过在每个节点上产生多个进程来获得更好的多节点分布式训练性能。如果有Infiniband接口则加速比会更高。

在单节点分布式训练或多节点分布式训练的两种情况下,该工具将为每个节点启动给定数量的进程(--nproc_per_node)。如果用于GPU训练,则此数字需要小于或等于当前系统上的GPU数量(nproc_per_node),并且每个进程将在从GPU 0到GPU(nproc_per_node - 1)的单个GPU上运行。

1. Single-Node multi-process distributed training

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVEYOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all otherarguments of your training script)

2. Multi-Node multi-process distributed training: (e.g. two nodes)

Node 1: (IP: 192.168.1.1, and has a free port: 1234)

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=0 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)

Node 2:

python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE--nnodes=2 --node_rank=1 --master_addr="192.168.1.1"--master_port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3and all other arguments of your training script)

需要注意的地方:

  • 后端最好用“NCCL”,才能获取最好的分布式性能
  • 训练代码必须从命令行解析--local_rank=LOCAL_PROCESS_RANK
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()torch.cuda.set_device(arg.local_rank)
  • torch.distributed初始化方式
torch.distributed.init_process_group(backend='nccl',init_method='env://')
  • model
model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[arg.local_rank],output_device=arg.local_rank)

训练代码中这样写(省略多余代码,只保留核心代码):

import torch.distributed as dist
# 这个参数是torch.distributed.launch传递过来的,我们设置位置参数来接受,local_rank代表当前程序进程使用的GPU标号
parser.add_argument("--local_rank", type=int, default=0) def synchronize():"""Helper function to synchronize (barrier) among all processes whenusing distributed training"""if not dist.is_available():returnif not dist.is_initialized():returnworld_size = dist.get_world_size()if world_size == 1:returndist.barrier()## WORLD_SIZE 由torch.distributed.launch.py产生 具体数值为 nproc_per_node*node(主机数,这里为1)
num_gpus = int(os.environ["WORLD_SIZE"]) if "WORLD_SIZE" in os.environ else 1is_distributed = num_gpus > 1if is_distributed:torch.cuda.set_device(args.local_rank)  # 这里设定每一个进程使用的GPU是一定的torch.distributed.init_process_group(backend="nccl", init_method="env://")synchronize()# 将模型移至到DistributedDataParallel中,此时就可以进行训练了
if is_distributed:model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.local_rank], output_device=args.local_rank,# this should be removed if we update BatchNorm statsbroadcast_buffers=False,)# 注意,在测试的时候需要执行 model = model.module 

WRITING DISTRIBUTED APPLICATIONS WITH PYTORCH

https://github.com/pytorch/examples/tree/master/imagenet 
这里,常规的操作就不多叙述了,主要讲一下和分布式相关的代码部分。

parser.add_argument('--world-size', default=2, type=int, help='number of distributed processes')
parser.add_argument('--dist-url', default='tcp://172.16.1.186:2222', type=str, help='url used to set up distributed training')
parser.add_argument('--dist-backend', default='gloo', type=str, help='distributed backend')
parser.add_argument('--dist-rank', default=0, type=int, help='rank of distributed processes')

这几个是必要的参数设置,其中最后一个是官网没有的

if args.distributed:dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,world_size=args.world_size,rank=args.dist_rank)

这个是分布式的初始化,同样,最后添加一个rank

model.cuda()
model = torch.nn.parallel.DistributedDataParallel(model)

这里,把我们平时使用的单机多卡,数据并行的API

model = torch.nn.DataParallel(model).cuda()

换掉即可。

if args.distributed:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

最后使用这个官方给的划分方法,把数据集划分即可。

五 MNIST 分布式例子

MNIST 分布式 (2台 GPU机器,每台一张GPU)

GPU1:   python mnist_dist.py --init-method=file:///home/workspace/share/1 --rank=0 --world-size=2

GPU2:   python mnist_dist.py --init-method=file:///home/workspace/share/1 --rank=1 --world-size=2

from __future__ import print_function
import argparse
import timeimport torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transformsimport torch.distributed as dist
import torch.utils.data
import torch.utils.data.distributedclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, size_average=False).item()  # sum up batch losspred = output.data.max(1, keepdim=True)[1]  # get the index of the max log-probabilitycorrect += pred.eq(target.data.view_as(pred)).cpu().sum()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true',help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')parser.add_argument('--init-method', type=str, default='tcp://127.0.0.1:23456')parser.add_argument('--rank', type=int)parser.add_argument('--world-size', type=int)args = parser.parse_args()use_cuda = not args.no_cuda and torch.cuda.is_available()print(args)# 初始化dist.init_process_group(init_method=args.init_method, backend="gloo", world_size=args.world_size, rank=args.rank,group_name="pytorch_test")torch.manual_seed(args.seed)if use_cuda:torch.cuda.manual_seed(args.seed)train_dataset = datasets.MNIST('./data', train=True, download=False,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))]))# 分发数据train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(train_dataset,batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)if use_cuda:model = torch.nn.parallel.DistributedDataParallel(model) if use_cuda else torch.nn.parallel.DistributedDataParallelCPU(model)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):# 设置epoch位置,这应该是个为了同步所做的工作train_sampler.set_epoch(epoch)start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)print("Total time= {:.3f}s".format(total_time))if __name__ == '__main__':main()

其他单机例子(一台GPU机器一张GPU卡)

python mnist_no_dist.py

from __future__ import print_function
import argparse
import timeimport torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transformsclass Net(nn.Module):def __init__(self):super(Net, self).__init__()self.conv1 = nn.Conv2d(1, 20, 5, 1)self.conv2 = nn.Conv2d(20, 50, 5, 1)self.fc1 = nn.Linear(4 * 4 * 50, 500)self.fc2 = nn.Linear(500, 10)def forward(self, x):x = F.relu(self.conv1(x))x = F.max_pool2d(x, 2, 2)x = F.relu(self.conv2(x))x = F.max_pool2d(x, 2, 2)x = x.view(-1, 4 * 4 * 50)x = F.relu(self.fc1(x))x = self.fc2(x)return F.log_softmax(x, dim=1)def train(args, model, device, train_loader, optimizer, epoch):model.train()for batch_idx, (data, target) in enumerate(train_loader):data, target = data.to(device), target.to(device)optimizer.zero_grad()output = model(data)loss = F.nll_loss(output, target)loss.backward()optimizer.step()if batch_idx % args.log_interval == 0:print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(data), len(train_loader.dataset),100. * batch_idx / len(train_loader), loss.item()))def test(args, model, device, test_loader):model.eval()test_loss = 0correct = 0with torch.no_grad():for data, target in test_loader:data, target = data.to(device), target.to(device)output = model(data)test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch losspred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probabilitycorrect += pred.eq(target.view_as(pred)).sum().item()test_loss /= len(test_loader.dataset)print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset),100. * correct / len(test_loader.dataset)))def main():# Training settingsparser = argparse.ArgumentParser(description='PyTorch MNIST Example')parser.add_argument('--batch-size', type=int, default=64, metavar='N',help='input batch size for training (default: 64)')parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',help='input batch size for testing (default: 1000)')parser.add_argument('--epochs', type=int, default=10, metavar='N',help='number of epochs to train (default: 10)')parser.add_argument('--lr', type=float, default=0.01, metavar='LR',help='learning rate (default: 0.01)')parser.add_argument('--momentum', type=float, default=0.5, metavar='M',help='SGD momentum (default: 0.5)')parser.add_argument('--no-cuda', action='store_true', default=False,help='disables CUDA training')parser.add_argument('--seed', type=int, default=1, metavar='S',help='random seed (default: 1)')parser.add_argument('--log-interval', type=int, default=500, metavar='N',help='how many batches to wait before logging training status')# parser.add_argument('--save-model', action='store_true', default=False,#                     help='For Saving the current Model')args = parser.parse_args()print(args)use_cuda = not args.no_cuda and torch.cuda.is_available()torch.manual_seed(args.seed)kwargs = {'num_workers': 5, 'pin_memory': True} if use_cuda else {}train_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=True, download=True,transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.batch_size, shuffle=True, **kwargs)test_loader = torch.utils.data.DataLoader(datasets.MNIST('./data', train=False, transform=transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])),batch_size=args.test_batch_size, shuffle=True, **kwargs)device = torch.device("cuda" if use_cuda else "cpu")print(device)model = Net().to(device)optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)total_time = 0for epoch in range(1, args.epochs + 1):start_cpu_secs = time.time()train(args, model, device, train_loader, optimizer, epoch)end_cpu_secs = time.time()print("Epoch {} of {} took {:.3f}s".format(epoch, args.epochs, end_cpu_secs - start_cpu_secs))total_time += end_cpu_secs - start_cpu_secstest(args, model, device, test_loader)# print("Total time= {:.3f}s".format(total_time))# if (args.save_model):#     torch.save(model.state_dict(), "mnist_cnn.pt")if __name__ == '__main__':main()

Dataloader中的参数

如果你的选项刚好是最坏情况,优化这个有可能达到2倍左右的性能提升(经验值哈)

解释一下DataLoader中其中两个参数:

  • num_worker:
    数据集加载的时候,控制用于同时加载数据的线程数(默认为0,即在主线程读取) 存在最优值,你会看到运行的时候pytorch会新建恰等于这个值的数据读取线程,我猜,线程多于必要的时候,数据读取线程返回到主线程反而会因为线程间通信减慢数据。因此大了不好小了也不好。建议把模型,loss,优化器全注释了只跑一下数据流速度,确定最优值
  • pin_memory:
    是否提前申请CUDA内存(默认为False,但有说法除非数据集很小,否则在N卡上推荐总是打开)在MNIST这样的小数据集上好像是关闭比较好,到底多小算小说不清楚,建议自己试一下。

pin_memory就是锁页内存,创建DataLoader时,设置pin_memory=True,则意味着生成的Tensor数据最开始是属于内存中的锁页内存,这样将内存的Tensor转义到GPU的显存就会更快一些。
主机中的内存,有两种存在方式,一是锁页,二是不锁页,锁页内存存放的内容在任何情况下都不会与主机的虚拟内存进行交换(注:虚拟内存就是硬盘),而不锁页内存在主机内存不足时,数据会存放在虚拟内存中。显卡中的显存全部是锁页内存,当计算机的内存充足的时候,可以设置pin_memory=True。当系统卡住,或者交换内存使用过多的时候,设置pin_memory=False。因为pin_memory与电脑硬件性能有关,pytorch开发者不能确保每一个炼丹玩家都有高端设备,因此pin_memory默认为False。

如果机子的内存比较大,建议开启pin_memory=Ture,如果开启后发现有卡顿现象或者内存占用过高,此时建议关闭。

总之官方的默认值很有可能不是最好的,建议自己多试试。在MNIST这样的小数据集上,pin_memory关闭比较好。而且,num_workers需要调节,除了默认情况外,最快和最慢是有一定差距的,建议在自己的代码上只跑数据读取这一块,确定这两个参数的最优值。

参考:

https://www.w3cschool.cn/pytorch/pytorch-hv9o3bsy.html

  1. PyTorch 1.0 中文官方教程:使用 Amazon AWS 进行分布式训练
  2. https://pytorch.org/docs/stable/distributed.html#initialization
  3. pytorch 分布式训练 distributed parallel 笔记
  4. 使用PyTorch编写分布式应用程序
  5. https://zhuanlan.zhihu.com/p/76638962

[深度学习] 分布式Pytorch介绍(三)相关推荐

  1. [深度学习] 分布式Horovod介绍(四)

    [深度学习] 分布式模式介绍(一) [深度学习] 分布式Tensorflow介绍(二) [深度学习] 分布式Pytorch 1.0介绍(三) [深度学习] 分布式Horovod介绍(四) 实际应用中, ...

  2. [深度学习] 分布式模式介绍(一)

    [深度学习] 分布式模式介绍(一) [深度学习] 分布式Tensorflow介绍(二) [深度学习] 分布式Pytorch 1.0介绍(三) [深度学习] 分布式Horovod介绍(四) 一  分布式 ...

  3. [深度学习] 分布式Tensorflow 2.0 介绍(二)

    [深度学习] 分布式模式介绍(一) [深度学习] 分布式Tensorflow 2.0介绍(二) [深度学习] 分布式Pytorch 1.0介绍(三) [深度学习] 分布式Horovod介绍(四) 一 ...

  4. 深度学习分布式策略优化、显存优化、通信优化、编译优化综述

    综述 因为我个人最近在从事可能是AI领域对性能挑战最大的方向,自动驾驶领域,所以对整个深度学习训练的优化尤为关注,最近一直在学习相关内容,谨以此篇文章做一个总结. 我一直很看好深度学习训练优化这个方向 ...

  5. (d2l-ai/d2l-zh)《动手学深度学习》pytorch 笔记(2)前言(介绍各种机器学习问题)以及数据操作预备知识Ⅰ

    开源项目地址:d2l-ai/d2l-zh 教材官网:https://zh.d2l.ai/ 书介绍:https://zh-v2.d2l.ai/ 笔记基于2021年7月26日发布的版本,书及代码下载地址在 ...

  6. 程序如何在两个gpu卡上并行运行_深度学习分布式训练相关介绍 - Part 1 多GPU训练...

    本篇文章主要是对深度学习中运用多GPU进行训练的一些基本的知识点进行的一个梳理 文章中的内容都是经过认真地分析,并且尽量做到有所考证 抛砖引玉,希望可以给大家有更多的启发,并能有所收获 介绍 大多数时 ...

  7. 深度学习主流框架介绍(PyTorch、TensorFlow、Keras、Caffe、Theano、MXNET)

    深度学习主流框架介绍(PyTorch.TensorFlow.Keras.Caffe.Theano.MXNET) 1.Theano Theano是最早的深度学习框架之一,由 Yoshua Bengio ...

  8. (d2l-ai/d2l-zh)《动手学深度学习》pytorch 笔记(3)前言(介绍各种机器学习问题)以及数据操作预备知识Ⅲ(概率)

    开源项目地址:d2l-ai/d2l-zh 教材官网:https://zh.d2l.ai/ 书介绍:https://zh-v2.d2l.ai/ 笔记基于2021年7月26日发布的版本,书及代码下载地址在 ...

  9. 深度学习分布式方案(个人笔记)

    深度学习分布式方案 [ 关注三个问题] 1.将程序改为分布式,需要改动多少代码 2.分布式程序/任务要启动,程序是否复杂? 3.分布式模式提升了多少运行效率? [分布式并行架构] (一)PS架构(pa ...

最新文章

  1. Java 常用设计模式 -- Builder模式
  2. Redis和mysql数据怎么保持数据一致的?
  3. CoSENT:比Sentence-BERT更有效的句向量方案
  4. 前端开发-4-HTML-tableform表单控制 标签
  5. BZOJ 3907: 网格( 组合数 + 高精度 )
  6. 近期计算机视觉相关算法竞赛汇总—高额奖金等你来拿!
  7. 一个第三方Dart库导致的编译错误!
  8. Linux下dd查看磁盘的读写性能
  9. Oracle网格控制器OMA端安装Yast
  10. Hive SQL开窗函数详解
  11. windows 快捷调用
  12. delete trancate drop三者之间的区别
  13. 计算机病毒论文课题内容,计算机病毒防治学论文选题 计算机病毒防治论文题目怎样定...
  14. html5 fa图标库,axure官方制作FontAwesome图标元件库V5.5.0版
  15. 帆软(FineReport)报表学习——一个简单的报表
  16. 华硕ASUS手机平板官方刷机包raw格式解压修改工具
  17. 抓包工具--Fiddler
  18. java 读取Excel数据(POI)(一个sheet或者多个sheet)
  19. 百度工程师眼中的云原生可观测性追踪技术
  20. 四节传送带plc梯形图_基于PLC的四节传送带设计.ppt

热门文章

  1. svn提交错误file is scheduled for addition, but is missing
  2. shell 命令 链接文件 ln
  3. ​“统治”移动处理器市场的Arm为何明年Q1才发布AI处理器?
  4. fixed在微信下的BUG
  5. [搜索]Trie树的实现
  6. [C语言]一个很实用的服务端和客户端进行UDP通信的实例
  7. JavaWeb学习总结(二)——JSP中的九个内置对象
  8. Android学习笔记
  9. 利用UDEV服务解决RAC ASM存储设备名
  10. Microsoft System Center Configuration Manager 2007 工作流中文版