1、DDP代码添加讲解

https://blog.csdn.net/cdknight_happy/article/details/108262595

2、apex 官网 apex + ddp + 数据优化

参考该代码实验

https://github.com/NVIDIA/apex/blob/master/examples/imagenet/main_amp.py

import argparse
import os
import shutil
import timeimport torch
import torch.nn as nn
import torch.nn.parallel
import torch.backends.cudnn as cudnn
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as modelsimport numpy as nptry:from apex.parallel import DistributedDataParallel as DDPfrom apex.fp16_utils import *from apex import amp, optimizersfrom apex.multi_tensor_apply import multi_tensor_applier
except ImportError:raise ImportError("Please install apex from https://www.github.com/nvidia/apex to run this example.")def fast_collate(batch, memory_format):imgs = [img[0] for img in batch]targets = torch.tensor([target[1] for target in batch], dtype=torch.int64)w = imgs[0].size[0]h = imgs[0].size[1]tensor = torch.zeros( (len(imgs), 3, h, w), dtype=torch.uint8).contiguous(memory_format=memory_format)for i, img in enumerate(imgs):nump_array = np.asarray(img, dtype=np.uint8)if(nump_array.ndim < 3):nump_array = np.expand_dims(nump_array, axis=-1)nump_array = np.rollaxis(nump_array, 2)tensor[i] += torch.from_numpy(nump_array)return tensor, targetsdef parse():model_names = sorted(name for name in models.__dict__if name.islower() and not name.startswith("__")and callable(models.__dict__[name]))parser = argparse.ArgumentParser(description='PyTorch ImageNet Training')parser.add_argument('data', metavar='DIR',help='path to dataset')parser.add_argument('--arch', '-a', metavar='ARCH', default='resnet18',choices=model_names,help='model architecture: ' +' | '.join(model_names) +' (default: resnet18)')parser.add_argument('-j', '--workers', default=4, type=int, metavar='N',help='number of data loading workers (default: 4)')parser.add_argument('--epochs', default=90, type=int, metavar='N',help='number of total epochs to run')parser.add_argument('--start-epoch', default=0, type=int, metavar='N',help='manual epoch number (useful on restarts)')parser.add_argument('-b', '--batch-size', default=256, type=int,metavar='N', help='mini-batch size per process (default: 256)')parser.add_argument('--lr', '--learning-rate', default=0.1, type=float,metavar='LR', help='Initial learning rate.  Will be scaled by <global batch size>/256: args.lr = args.lr*float(args.batch_size*args.world_size)/256.  A warmup schedule will also be applied over the first 5 epochs.')parser.add_argument('--momentum', default=0.9, type=float, metavar='M',help='momentum')parser.add_argument('--weight-decay', '--wd', default=1e-4, type=float,metavar='W', help='weight decay (default: 1e-4)')parser.add_argument('--print-freq', '-p', default=10, type=int,metavar='N', help='print frequency (default: 10)')parser.add_argument('--resume', default='', type=str, metavar='PATH',help='path to latest checkpoint (default: none)')parser.add_argument('-e', '--evaluate', dest='evaluate', action='store_true',help='evaluate model on validation set')parser.add_argument('--pretrained', dest='pretrained', action='store_true',help='use pre-trained model')parser.add_argument('--prof', default=-1, type=int,help='Only run 10 iterations for profiling.')parser.add_argument('--deterministic', action='store_true')parser.add_argument("--local_rank", default=0, type=int)parser.add_argument('--sync_bn', action='store_true',help='enabling apex sync BN.')parser.add_argument('--opt-level', type=str)parser.add_argument('--keep-batchnorm-fp32', type=str, default=None)parser.add_argument('--loss-scale', type=str, default=None)parser.add_argument('--channels-last', type=bool, default=False)args = parser.parse_args()return argsdef main():global best_prec1, argsargs = parse()print("opt_level = {}".format(args.opt_level))print("keep_batchnorm_fp32 = {}".format(args.keep_batchnorm_fp32), type(args.keep_batchnorm_fp32))print("loss_scale = {}".format(args.loss_scale), type(args.loss_scale))print("\nCUDNN VERSION: {}\n".format(torch.backends.cudnn.version()))cudnn.benchmark = Truebest_prec1 = 0if args.deterministic:cudnn.benchmark = Falsecudnn.deterministic = Truetorch.manual_seed(args.local_rank)torch.set_printoptions(precision=10)args.distributed = Falseif 'WORLD_SIZE' in os.environ:args.distributed = int(os.environ['WORLD_SIZE']) > 1args.gpu = 0args.world_size = 1if args.distributed:args.gpu = args.local_ranktorch.cuda.set_device(args.gpu)torch.distributed.init_process_group(backend='nccl',init_method='env://')args.world_size = torch.distributed.get_world_size()assert torch.backends.cudnn.enabled, "Amp requires cudnn backend to be enabled."if args.channels_last:memory_format = torch.channels_lastelse:memory_format = torch.contiguous_format# create modelif args.pretrained:print("=> using pre-trained model '{}'".format(args.arch))model = models.__dict__[args.arch](pretrained=True)else:print("=> creating model '{}'".format(args.arch))model = models.__dict__[args.arch]()if args.sync_bn:import apexprint("using apex synced BN")model = apex.parallel.convert_syncbn_model(model)model = model.cuda().to(memory_format=memory_format)# Scale learning rate based on global batch sizeargs.lr = args.lr*float(args.batch_size*args.world_size)/256.optimizer = torch.optim.SGD(model.parameters(), args.lr,momentum=args.momentum,weight_decay=args.weight_decay)# Initialize Amp.  Amp accepts either values or strings for the optional override arguments,# for convenient interoperation with argparse.model, optimizer = amp.initialize(model, optimizer,opt_level=args.opt_level,keep_batchnorm_fp32=args.keep_batchnorm_fp32,loss_scale=args.loss_scale)# For distributed training, wrap the model with apex.parallel.DistributedDataParallel.# This must be done AFTER the call to amp.initialize.  If model = DDP(model) is called# before model, ... = amp.initialize(model, ...), the call to amp.initialize may alter# the types of model's parameters in a way that disrupts or destroys DDP's allreduce hooks.if args.distributed:# By default, apex.parallel.DistributedDataParallel overlaps communication with# computation in the backward pass.# model = DDP(model)# delay_allreduce delays all communication to the end of the backward pass.model = DDP(model, delay_allreduce=True)# define loss function (criterion) and optimizercriterion = nn.CrossEntropyLoss().cuda()# Optionally resume from a checkpointif args.resume:# Use a local scope to avoid dangling referencesdef resume():if os.path.isfile(args.resume):print("=> loading checkpoint '{}'".format(args.resume))checkpoint = torch.load(args.resume, map_location = lambda storage, loc: storage.cuda(args.gpu))args.start_epoch = checkpoint['epoch']global best_prec1best_prec1 = checkpoint['best_prec1']model.load_state_dict(checkpoint['state_dict'])optimizer.load_state_dict(checkpoint['optimizer'])print("=> loaded checkpoint '{}' (epoch {})".format(args.resume, checkpoint['epoch']))else:print("=> no checkpoint found at '{}'".format(args.resume))resume()# Data loading codetraindir = os.path.join(args.data, 'train')valdir = os.path.join(args.data, 'val')if(args.arch == "inception_v3"):raise RuntimeError("Currently, inception_v3 is not supported by this example.")# crop_size = 299# val_size = 320 # I chose this value arbitrarily, we can adjust.else:crop_size = 224val_size = 256train_dataset = datasets.ImageFolder(traindir,transforms.Compose([transforms.RandomResizedCrop(crop_size),transforms.RandomHorizontalFlip(),# transforms.ToTensor(), Too slow# normalize,]))val_dataset = datasets.ImageFolder(valdir, transforms.Compose([transforms.Resize(val_size),transforms.CenterCrop(crop_size),]))train_sampler = Noneval_sampler = Noneif args.distributed:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)collate_fn = lambda b: fast_collate(b, memory_format)train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),num_workers=args.workers, pin_memory=True, sampler=train_sampler, collate_fn=collate_fn)val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=args.batch_size, shuffle=False,num_workers=args.workers, pin_memory=True,sampler=val_sampler,collate_fn=collate_fn)if args.evaluate:validate(val_loader, model, criterion)returnfor epoch in range(args.start_epoch, args.epochs):if args.distributed:train_sampler.set_epoch(epoch)# train for one epochtrain(train_loader, model, criterion, optimizer, epoch)# evaluate on validation setprec1 = validate(val_loader, model, criterion)# remember best prec@1 and save checkpointif args.local_rank == 0:is_best = prec1 > best_prec1best_prec1 = max(prec1, best_prec1)save_checkpoint({'epoch': epoch + 1,'arch': args.arch,'state_dict': model.state_dict(),'best_prec1': best_prec1,'optimizer' : optimizer.state_dict(),}, is_best)class data_prefetcher():def __init__(self, loader):self.loader = iter(loader)self.stream = torch.cuda.Stream()self.mean = torch.tensor([0.485 * 255, 0.456 * 255, 0.406 * 255]).cuda().view(1,3,1,1)self.std = torch.tensor([0.229 * 255, 0.224 * 255, 0.225 * 255]).cuda().view(1,3,1,1)# With Amp, it isn't necessary to manually convert data to half.# if args.fp16:#     self.mean = self.mean.half()#     self.std = self.std.half()self.preload()def preload(self):try:self.next_input, self.next_target = next(self.loader)except StopIteration:self.next_input = Noneself.next_target = Nonereturn# if record_stream() doesn't work, another option is to make sure device inputs are created# on the main stream.# self.next_input_gpu = torch.empty_like(self.next_input, device='cuda')# self.next_target_gpu = torch.empty_like(self.next_target, device='cuda')# Need to make sure the memory allocated for next_* is not still in use by the main stream# at the time we start copying to next_*:# self.stream.wait_stream(torch.cuda.current_stream())with torch.cuda.stream(self.stream):self.next_input = self.next_input.cuda(non_blocking=True)self.next_target = self.next_target.cuda(non_blocking=True)# more code for the alternative if record_stream() doesn't work:# copy_ will record the use of the pinned source tensor in this side stream.# self.next_input_gpu.copy_(self.next_input, non_blocking=True)# self.next_target_gpu.copy_(self.next_target, non_blocking=True)# self.next_input = self.next_input_gpu# self.next_target = self.next_target_gpu# With Amp, it isn't necessary to manually convert data to half.# if args.fp16:#     self.next_input = self.next_input.half()# else:self.next_input = self.next_input.float()self.next_input = self.next_input.sub_(self.mean).div_(self.std)def next(self):torch.cuda.current_stream().wait_stream(self.stream)input = self.next_inputtarget = self.next_targetif input is not None:input.record_stream(torch.cuda.current_stream())if target is not None:target.record_stream(torch.cuda.current_stream())self.preload()return input, targetdef train(train_loader, model, criterion, optimizer, epoch):batch_time = AverageMeter()losses = AverageMeter()top1 = AverageMeter()top5 = AverageMeter()# switch to train modemodel.train()end = time.time()prefetcher = data_prefetcher(train_loader)input, target = prefetcher.next()i = 0while input is not None:i += 1if args.prof >= 0 and i == args.prof:print("Profiling begun at iteration {}".format(i))torch.cuda.cudart().cudaProfilerStart()if args.prof >= 0: torch.cuda.nvtx.range_push("Body of iteration {}".format(i))adjust_learning_rate(optimizer, epoch, i, len(train_loader))# compute outputif args.prof >= 0: torch.cuda.nvtx.range_push("forward")output = model(input)if args.prof >= 0: torch.cuda.nvtx.range_pop()loss = criterion(output, target)# compute gradient and do SGD stepoptimizer.zero_grad()if args.prof >= 0: torch.cuda.nvtx.range_push("backward")with amp.scale_loss(loss, optimizer) as scaled_loss:scaled_loss.backward()if args.prof >= 0: torch.cuda.nvtx.range_pop()# for param in model.parameters():#     print(param.data.double().sum().item(), param.grad.data.double().sum().item())if args.prof >= 0: torch.cuda.nvtx.range_push("optimizer.step()")optimizer.step()if args.prof >= 0: torch.cuda.nvtx.range_pop()if i%args.print_freq == 0:# Every print_freq iterations, check the loss, accuracy, and speed.# For best performance, it doesn't make sense to print these metrics every# iteration, since they incur an allreduce and some host<->device syncs.# Measure accuracyprec1, prec5 = accuracy(output.data, target, topk=(1, 5))# Average loss and accuracy across processes for loggingif args.distributed:reduced_loss = reduce_tensor(loss.data)prec1 = reduce_tensor(prec1)prec5 = reduce_tensor(prec5)else:reduced_loss = loss.data# to_python_float incurs a host<->device synclosses.update(to_python_float(reduced_loss), input.size(0))top1.update(to_python_float(prec1), input.size(0))top5.update(to_python_float(prec5), input.size(0))torch.cuda.synchronize()batch_time.update((time.time() - end)/args.print_freq)end = time.time()if args.local_rank == 0:print('Epoch: [{0}][{1}/{2}]\t''Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t''Speed {3:.3f} ({4:.3f})\t''Loss {loss.val:.10f} ({loss.avg:.4f})\t''Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t''Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(epoch, i, len(train_loader),args.world_size*args.batch_size/batch_time.val,args.world_size*args.batch_size/batch_time.avg,batch_time=batch_time,loss=losses, top1=top1, top5=top5))if args.prof >= 0: torch.cuda.nvtx.range_push("prefetcher.next()")input, target = prefetcher.next()if args.prof >= 0: torch.cuda.nvtx.range_pop()# Pop range "Body of iteration {}".format(i)if args.prof >= 0: torch.cuda.nvtx.range_pop()if args.prof >= 0 and i == args.prof + 10:print("Profiling ended at iteration {}".format(i))torch.cuda.cudart().cudaProfilerStop()quit()def validate(val_loader, model, criterion):batch_time = AverageMeter()losses = AverageMeter()top1 = AverageMeter()top5 = AverageMeter()# switch to evaluate modemodel.eval()end = time.time()prefetcher = data_prefetcher(val_loader)input, target = prefetcher.next()i = 0while input is not None:i += 1# compute outputwith torch.no_grad():output = model(input)loss = criterion(output, target)# measure accuracy and record lossprec1, prec5 = accuracy(output.data, target, topk=(1, 5))if args.distributed:reduced_loss = reduce_tensor(loss.data)prec1 = reduce_tensor(prec1)prec5 = reduce_tensor(prec5)else:reduced_loss = loss.datalosses.update(to_python_float(reduced_loss), input.size(0))top1.update(to_python_float(prec1), input.size(0))top5.update(to_python_float(prec5), input.size(0))# measure elapsed timebatch_time.update(time.time() - end)end = time.time()# TODO:  Change timings to mirror train().if args.local_rank == 0 and i % args.print_freq == 0:print('Test: [{0}/{1}]\t''Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t''Speed {2:.3f} ({3:.3f})\t''Loss {loss.val:.4f} ({loss.avg:.4f})\t''Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t''Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(i, len(val_loader),args.world_size * args.batch_size / batch_time.val,args.world_size * args.batch_size / batch_time.avg,batch_time=batch_time, loss=losses,top1=top1, top5=top5))input, target = prefetcher.next()print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'.format(top1=top1, top5=top5))return top1.avgdef save_checkpoint(state, is_best, filename='checkpoint.pth.tar'):torch.save(state, filename)if is_best:shutil.copyfile(filename, 'model_best.pth.tar')class AverageMeter(object):"""Computes and stores the average and current value"""def __init__(self):self.reset()def reset(self):self.val = 0self.avg = 0self.sum = 0self.count = 0def update(self, val, n=1):self.val = valself.sum += val * nself.count += nself.avg = self.sum / self.countdef adjust_learning_rate(optimizer, epoch, step, len_epoch):"""LR schedule that should yield 76% converged accuracy with batch size 256"""factor = epoch // 30if epoch >= 80:factor = factor + 1lr = args.lr*(0.1**factor)"""Warmup"""if epoch < 5:lr = lr*float(1 + step + epoch*len_epoch)/(5.*len_epoch)# if(args.local_rank == 0):#     print("epoch = {}, step = {}, lr = {}".format(epoch, step, lr))for param_group in optimizer.param_groups:param_group['lr'] = lrdef accuracy(output, target, topk=(1,)):"""Computes the precision@k for the specified values of k"""maxk = max(topk)batch_size = target.size(0)_, pred = output.topk(maxk, 1, True, True)pred = pred.t()correct = pred.eq(target.view(1, -1).expand_as(pred))res = []for k in topk:correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)res.append(correct_k.mul_(100.0 / batch_size))return resdef reduce_tensor(tensor):rt = tensor.clone()dist.all_reduce(rt, op=dist.reduce_op.SUM)rt /= args.world_sizereturn rtif __name__ == '__main__':main()

pytorch apex +ddp 分布式训练+数据优化相关推荐

  1. PyTorch多卡分布式训练:DistributedDataParallel (DDP) 简要分析

    ©作者 | 伟大是熬出来的 单位 | 同济大学 研究方向 | 机器阅读理解 前言 因为课题组发的卡还没有下来,先向导师问了实验室的两张卡借用.之前都是单卡训练模型,正好在这个机会实践以下单机多卡训练模 ...

  2. pytorch apex 混合精度训练和horovod分布式训练

    转载请注明出处: https://mp.csdn.net/postedit/103600124 如果你基于pytorch训练模型,然后,你想加快训练速度,增大batch_size,或者,你有一台配置多 ...

  3. 深度学习的分布式训练--数据并行和模型并行

    <div class="htmledit_views"> 在深度学习这一领域经常涉及到模型的分布式训练(包括一机多GPU的情况).我自己在刚刚接触到一机多卡,或者分布式 ...

  4. 使用Pytorch DataLoader快捷封装训练数据、测试数据的X与Y

    DataSet的用法可以参考:pytorch 构造读取数据的工具类 Dataset 与 DataLoader (pytorch Data学习一) DataLoader的封装方法可以参考:Pytorch ...

  5. pytorch多GPU分布式训练代码编写

    本文主要讲述单机单卡.单机多卡的简单使用方法: 文章目录 单机单卡 单机多卡 DP DDP 单机单卡 单机单卡就是一台机器上只有一张卡,是最简单的训练方式 对于单机单卡,我们所需要做的就是把模型和数据 ...

  6. Pytorch专题实战——批训练数据(DataLoader)

    文章目录 1.计算流程 2.Pytorch构造批处理数据 2.1.导入必要模块 2.2.定义数据类 2.3.定义DataLoader 2.4.打印效果 1.计算流程 # Implement a cus ...

  7. DistributedDataParallel(DDP)Pytorch 分布式训练示例及注意事项

    现在pytorch主流的分布式训练库是DistributedDataParallel,它比Dataparallel库要快,而且前者能实现多机多卡后者只能单机多卡.本文是在单机多卡的环境下执行的分布式训 ...

  8. PyTorch 1.0 中文官方教程:使用 Amazon AWS 进行分布式训练

    译者:yportne13 作者: Nathan Inkawhich 编辑: Teng Li 在这篇教程中我们会展示如何使用 Amazon AWS 的两个多路GPU节点来设置,编写和运行 PyTorch ...

  9. 混合精度训练、分布式训练等训练加速方法

    以Pytorch为例 混合精度训练 Pytorch自动混合精度(AMP)训练 Pytorch自动混合精度(AMP)介绍与使用 1. 理论基础 pytorch从1.6版本开始,已经内置了torch.cu ...

最新文章

  1. 今晚8点开播 | 深度解析知识图谱发展关键阶段技术脉络
  2. 3D视觉检测的未来:光度立体技术
  3. 在CentOS 6.8上安装Nginx
  4. R语言vtreat包自动处理dataframe的缺失值、使用分组的中位数来标准化数据列中每个数据的值(和中位数表连接并基于中位数进行数据标化)、计算数据列的中位数或者均值并进行数据标准化
  5. 如何将json格式的string字符串转换为string数组
  6. 深信服上网行为-域新组建模式单点登录不成功排错
  7. 分享我做的博客园Skin(目前12个,最后更新时间2006-5-10 PM)
  8. vue生成静态js文件_如何立即使用Vue.js生成静态网站
  9. 首发!来自你的Java同行的调查报告
  10. 2011年11月份第二周51Aspx源码发布详情
  11. 测试结果OK、NG、NT、POK的意思
  12. QQ三国推举孝廉答题工具使用说明
  13. h5通过当前时间获取农历日期
  14. 算法笔记习题 7-1小节
  15. Android 设置应用启动动画
  16. 第一次使用linux电脑,计算机基础与Linux初次亲密接触第一天(马哥)
  17. Newcoder Wannafly13 B Jxy军训(费马小定理、分数在模意义下的值)
  18. 科普计算机知识的视频,知识短视频 科普更走心
  19. SolarWinds入侵事件余波:英特尔、英伟达、思科等科技巨头亦躺枪
  20. 多渔:聊聊自媒体和流量变现

热门文章

  1. 北斗校时系统(GPS授时设备)几种时间同步方法
  2. 用梅森公式求复杂电路系统传递函数
  3. 【PM必知】项目管理的“六大核心”内容详解
  4. 清亡之路(4):最受误解的东南互保
  5. 阿里----OSS对象存储服务
  6. 收单外包机构备案业务类型汇总分析
  7. Matlab/Simulink中的S函数模块嵌入人工智能、神经网络算法设计仿真案例详解(以基于RBF神经网络算法的VSG转动惯量自调节为例)
  8. RS485通信和Modbus协议
  9. python vbs_python和vbs
  10. 写给程序员的编程语言科普——前言