TF的实现分为了单机实现和分布式实现,在分布式实现中,需要实现的是对client,master,worker process不在同一台机器上时的支持。数据量很大的情况下,单机跑深度学习程序,过于耗时,所以需要TensorFlow分布式并行。

该实例是TF的经典入门实例手写字体识别MNIST基于分布式的实现,代码都加了中文注释,更加通俗易懂。

GitHub实例地址:https://github.com/TracyMcgrady6/Distribute_MNIST
欢迎大家star

Single-Device Execution

构建好图后,使用拓扑算法来决定执行哪一个节点,即对每个节点使用一个计数,值表示所依赖的未完成的节点数目,当一个节点的运算完成时,将依赖该节点的所有节点的计数减一。如果节点的计数为0,将其放入准备队列待执行。

单机多GPU训练

先简单介绍下单机的多GPU训练,然后再介绍分布式的多机多GPU训练。
单机的多GPU训练, tensorflow的官方已经给了一个cifar的例子,已经有比较详细的代码和文档介绍, 这里大致说下多GPU的过程,以便方便引入到多机多GPU的介绍。
单机多GPU的训练过程:

  1. 假设你的机器上有3个GPU;

  2. 在单机单GPU的训练中,数据是一个batch一个batch的训练。 在单机多GPU中,数据一次处理3个batch(假设是3个GPU训练), 每个GPU处理一个batch的数据计算。

  3. 变量,或者说参数,保存在CPU上

  4. 刚开始的时候数据由CPU分发给3个GPU, 在GPU上完成了计算,得到每个batch要更新的梯度。

  5. 然后在CPU上收集完了3个GPU上的要更新的梯度, 计算一下平均梯度,然后更新参数。

  6. 然后继续循环这个过程。

通过这个过程,处理的速度取决于最慢的那个GPU的速度。如果3个GPU的处理速度差不多的话, 处理速度就相当于单机单GPU的速度的3倍减去数据在CPU和GPU之间传输的开销,实际的效率提升看CPU和GPU之间数据的速度和处理数据的大小。

通俗解释

写到这里觉得自己写的还是不同通俗易懂, 下面就打一个更加通俗的比方来解释一下:

老师给小明和小华布置了10000张纸的乘法题并且把所有的乘法的结果加起来, 每张纸上有128道乘法题。 这里一张纸就是一个batch, batch_size就是128. 小明算加法比较快, 小华算乘法比较快,于是小华就负责计算乘法, 小明负责把小华的乘法结果加起来 。 这样小明就是CPU,小华就是GPU.

这样计算的话, 预计小明和小华两个人得要花费一个星期的时间才能完成老师布置的题目。 于是小明就招来2个算乘法也很快的小红和小亮。 于是每次小明就给小华,小红,小亮各分发一张纸,让他们算乘法, 他们三个人算完了之后, 把结果告诉小明, 小明把他们的结果加起来,然后再给他们没人分发一张算乘法的纸,依次循环,知道所有的算完。

这里小明采用的是同步模式,就是每次要等他们三个都算完了之后, 再统一算加法,算完了加法之后,再给他们三个分发纸张。这样速度就取决于他们三个中算乘法算的最慢的那个人, 和分发纸张的速度。

Multi-Device Execution

当系统到了分布式情况下时,事情就变得复杂了很多,还好前述调度用了现有的框架。那么对于TF来说,剩下的事情就是:

决定运算在哪个设备上运行
管理设备之间的数据传递

分布式多机多GPU训练

随着设计的模型越来越复杂,模型参数越来越多,越来越大, 大到什么程度?多到什么程度? 多参数的个数上百亿个, 训练的数据多到按TB级别来衡量。大家知道每次计算一轮,都要计算梯度,更新参数。 当参数的量级上升到百亿量级甚至更大之后, 参数的更新的性能都是问题。 如果是单机16个GPU, 一个step最多也是处理16个batch, 这对于上TB级别的数据来说,不知道要训练到什么时候。于是就有了分布式的深度学习训练方法,或者说框架。

参数服务器

在介绍tensorflow的分布式训练之前,先说下参数服务器的概念。
前面说道, 当你的模型越来越大, 模型的参数越来越多,多到模型参数的更新,一台机器的性能都不够的时候, 很自然的我们就会想到把参数分开放到不同的机器去存储和更新。
因为碰到上面提到的那些问题, 所有参数服务器就被单独拧出来, 于是就有了参数服务器的概念。 参数服务器可以是多台机器组成的集群, 这个就有点类似分布式的存储架构了, 涉及到数据的同步,一致性等等, 一般是key-value的形式,可以理解为一个分布式的key-value内存数据库,然后再加上一些参数更新的操作。 详细的细节可以去google一下, 这里就不详细说了。 反正就是当性能不够的时候,
几百亿的参数分散到不同的机器上去保存和更新,解决参数存储和更新的性能问题。
借用上面的小明算题的例子,小明觉得自己算加法都算不过来了, 于是就叫了10个小明过来一起帮忙算。

gRPC (google remote procedure call)

TensorFlow分布式并行基于gRPC通信框架,其中包括一个master创建Session,还有多个worker负责执行计算图中的任务。

gRPC首先是一个RPC,即远程过程调用,通俗的解释是:假设你在本机上执行一段代码num=add(a,b),它调用了一个过程 call,然后返回了一个值num,你感觉这段代码只是在本机上执行的, 但实际情况是,本机上的add方法是将参数打包发送给服务器,然后服务器运行服务器端的add方法,返回的结果再将数据打包返回给客户端.

结构

Cluster是Job的集合,Job是Task的集合。

即:一个Cluster可以切分多个Job,一个Job指一类特定的任务,每个Job包含多个Task,比如parameter server(ps)、worker,在大多数情况下,一个机器上只运行一个Task.

在分布式深度学习框架中,我们一般把Job划分为Parameter Server和Worker:

  • Parameter Job是管理参数的存储和更新工作.
  • Worker Job是来运行ops.

如果参数的数量太大,一台机器处理不了,这就要需要多个Tasks.

TF分布式模式

In-graph 模式

将模型的计算图的不同部分放在不同的机器上执行

In-graph模式和单机多GPU模型有点类似。 还是一个小明算加法, 但是算乘法的就可以不止是他们一个教室的小华,小红,小亮了。 可以是其他教师的小张,小李。。。。

In-graph模式, 把计算已经从单机多GPU,已经扩展到了多机多GPU了, 不过数据分发还是在一个节点。 这样的好处是配置简单, 其他多机多GPU的计算节点,只要起个join操作, 暴露一个网络接口,等在那里接受任务就好了。 这些计算节点暴露出来的网络接口,使用起来就跟本机的一个GPU的使用一样, 只要在操作的时候指定tf.device(“/job:worker/task:n”),
就可以向指定GPU一样,把操作指定到一个计算节点上计算,使用起来和多GPU的类似。 但是这样的坏处是训练数据的分发依然在一个节点上, 要把训练数据分发到不同的机器上, 严重影响并发训练速度。在大数据训练的情况下, 不推荐使用这种模式。

Between-graph 模式

数据并行,每台机器使用完全相同的计算图

Between-graph模式下,训练的参数保存在参数服务器, 数据不用分发, 数据分片的保存在各个计算节点, 各个计算节点自己算自己的, 算完了之后, 把要更新的参数告诉参数服务器,参数服务器更新参数。这种模式的优点是不用训练数据的分发了, 尤其是在数据量在TB级的时候, 节省了大量的时间,所以大数据深度学习还是推荐使用Between-graph模式。

同步更新和异步更新

in-graph模式和between-graph模式都支持同步和异步更新。

在同步更新的时候, 每次梯度更新,要等所有分发出去的数据计算完成后,返回回来结果之后,把梯度累加算了均值之后,再更新参数。 这样的好处是loss的下降比较稳定, 但是这个的坏处也很明显, 处理的速度取决于最慢的那个分片计算的时间。

在异步更新的时候, 所有的计算节点,各自算自己的, 更新参数也是自己更新自己计算的结果, 这样的优点就是计算速度快, 计算资源能得到充分利用,但是缺点是loss的下降不稳定, 抖动大。

在数据量小的情况下, 各个节点的计算能力比较均衡的情况下, 推荐使用同步模式;数据量很大,各个机器的计算性能掺差不齐的情况下,推荐使用异步的方式。

实例

tensorflow官方有个分布式tensorflow的文档,但是例子没有完整的代码, 这里写了一个最简单的可以跑起来的例子,供大家参考,这里也傻瓜式给大家解释一下代码,以便更加通俗的理解。

该例子是TF的入门实例手写字体识别MNIST基于分布式的实现,代码都加了中文注释,更加通俗易懂。

执行命令示例

ps 节点执行:

python distributed.py --job_name=ps --task_index=0

worker1 节点执行:

python distributed.py --job_name=worker --task_index=0

worker2 节点执行:

python distributed.py --job_name=worker --task_index=1

运行结果

worker0节点运行结果

worker1节点运行结果

实验代码

# encoding:utf-8
import math
import tempfile
import time
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_dataflags = tf.app.flags
IMAGE_PIXELS = 28
# 定义默认训练参数和数据路径
flags.DEFINE_string('data_dir', '/tmp/mnist-data', 'Directory  for storing mnist data')
flags.DEFINE_integer('hidden_units', 100, 'Number of units in the hidden layer of the NN')
flags.DEFINE_integer('train_steps', 10000, 'Number of training steps to perform')
flags.DEFINE_integer('batch_size', 100, 'Training batch size ')
flags.DEFINE_float('learning_rate', 0.01, 'Learning rate')
# 定义分布式参数
# 参数服务器parameter server节点
flags.DEFINE_string('ps_hosts', '192.168.32.145:22221', 'Comma-separated list of hostname:port pairs')
# 两个worker节点
flags.DEFINE_string('worker_hosts', '192.168.32.146:22221,192.168.32.160:22221','Comma-separated list of hostname:port pairs')
# 设置job name参数
flags.DEFINE_string('job_name', None, 'job name: worker or ps')
# 设置任务的索引
flags.DEFINE_integer('task_index', None, 'Index of task within the job')
# 选择异步并行,同步并行
flags.DEFINE_integer("issync", None, "是否采用分布式的同步模式,1表示同步模式,0表示异步模式")FLAGS = flags.FLAGSdef main(unused_argv):mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)if FLAGS.job_name is None or FLAGS.job_name == '':raise ValueError('Must specify an explicit job_name !')else:print 'job_name : %s' % FLAGS.job_nameif FLAGS.task_index is None or FLAGS.task_index == '':raise ValueError('Must specify an explicit task_index!')else:print 'task_index : %d' % FLAGS.task_indexps_spec = FLAGS.ps_hosts.split(',')worker_spec = FLAGS.worker_hosts.split(',')# 创建集群num_worker = len(worker_spec)cluster = tf.train.ClusterSpec({'ps': ps_spec, 'worker': worker_spec})server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)if FLAGS.job_name == 'ps':server.join()is_chief = (FLAGS.task_index == 0)# worker_device = '/job:worker/task%d/cpu:0' % FLAGS.task_indexwith tf.device(tf.train.replica_device_setter(cluster=cluster)):global_step = tf.Variable(0, name='global_step', trainable=False)  # 创建纪录全局训练步数变量hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],stddev=1.0 / IMAGE_PIXELS), name='hid_w')hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name='hid_b')sm_w = tf.Variable(tf.truncated_normal([FLAGS.hidden_units, 10],stddev=1.0 / math.sqrt(FLAGS.hidden_units)), name='sm_w')sm_b = tf.Variable(tf.zeros([10]), name='sm_b')x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])y_ = tf.placeholder(tf.float32, [None, 10])hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)hid = tf.nn.relu(hid_lin)y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))cross_entropy = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))opt = tf.train.AdamOptimizer(FLAGS.learning_rate)train_step = opt.minimize(cross_entropy, global_step=global_step)# 生成本地的参数初始化操作init_opinit_op = tf.global_variables_initializer()train_dir = tempfile.mkdtemp()sv = tf.train.Supervisor(is_chief=is_chief, logdir=train_dir, init_op=init_op, recovery_wait_secs=1,global_step=global_step)if is_chief:print 'Worker %d: Initailizing session...' % FLAGS.task_indexelse:print 'Worker %d: Waiting for session to be initaialized...' % FLAGS.task_indexsess = sv.prepare_or_wait_for_session(server.target)print 'Worker %d: Session initialization  complete.' % FLAGS.task_indextime_begin = time.time()print 'Traing begins @ %f' % time_beginlocal_step = 0while True:batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)train_feed = {x: batch_xs, y_: batch_ys}_, step = sess.run([train_step, global_step], feed_dict=train_feed)local_step += 1now = time.time()print '%f: Worker %d: traing step %d dome (global step:%d)' % (now, FLAGS.task_index, local_step, step)if step >= FLAGS.train_steps:breaktime_end = time.time()print 'Training ends @ %f' % time_endtrain_time = time_end - time_beginprint 'Training elapsed time:%f s' % train_timeval_feed = {x: mnist.validation.images, y_: mnist.validation.labels}val_xent = sess.run(cross_entropy, feed_dict=val_feed)print 'After %d training step(s), validation cross entropy = %g' % (FLAGS.train_steps, val_xent)sess.close()if __name__ == '__main__':tf.app.run()

参考:
http://blog.csdn.net/luodongri/article/details/52596780
http://blog.csdn.net/u012436149/article/details/53140869
http://blog.csdn.net/stdcoutzyx/article/details/51645396
TensorFlow实战[黄文坚 唐源 著]

TensorFlow分布式全套(原理,部署,实例)相关推荐

  1. Tensorflow分布式训练原理

    以下文章摘录自: <机器学习观止--核心原理与实践> 京东: https://item.jd.com/13166960.html 当当:http://product.dangdang.co ...

  2. Hadoop 部署实例

    Linux下的Hadoop–分布式模式的部署实例修改浏览权限 | 删除 Linux下的Hadoop--分布式模式的部署 选用软件版本: 1. jdk 1.6.0.10 2. hadoop-0.19.1 ...

  3. 白话tensorflow分布式部署和开发

    2019独角兽企业重金招聘Python工程师标准>>> 白话tensorflow分布式部署和开发 博客分类: 深度学习 关于tensorflow的分布式训练和部署, 官方有个英文的文 ...

  4. Tensorflow 分布式原理理解

    原贴:https://yq.aliyun.com/articles/620563?utm_content=m_1000010596 另附Tenssorflow白皮书 摘要: 1. Tensorflow ...

  5. TensorFlow与PyTorch模型部署性能比较

    TensorFlow与PyTorch模型部署性能比较 前言 2022了,选 PyTorch 还是 TensorFlow?之前有一种说法:TensorFlow 适合业界,PyTorch 适合学界.这种说 ...

  6. redis 槽点重新分配 集群_5000+字硬核干货!Redis 分布式集群部署实战

    原理: Redis集群采用一致性哈希槽的方式将集群中每个主节点都分配一定的哈希槽,对写入的数据进行哈希后分配到某个主节点进行存储. 集群使用公式(CRC16 key)& 16384计算键key ...

  7. 《深入理解分布式事务》第七章 XA 强一致性分布式事务原理

    <深入理解分布式事务>第七章 XA 强一致性分布式事务原理 文章目录 <深入理解分布式事务>第七章 XA 强一致性分布式事务原理 一.X/Open DTP 模型与 XA 规范 ...

  8. 分布式定时任务原理以及解决方案-指定时间执行定时任务

    分布式定时任务原理以及实现 一.单机指定时间执行定时任务实现方式 Timer运行机制 ScheduledThreadPoolExecutor的运行机制 原理图 Leader/Follower模式 Ti ...

  9. RocketMQ分布式事务原理介绍

    RocketMQ实现分布式事务原理 1.知识准备 在系统架构从单体到分布式.SOA.微服务的发展过程中,因为流量的增多出现了大量消息堆积问题的需求,在这种背景下,阿里开发出rocketmq来解决该问题 ...

最新文章

  1. ML基石_8_NoiseAndError
  2. 大数据时代如何赢得财务人才
  3. Win下PHP环境Eclipse PDT+XAMPP+XDebug部署
  4. cat--创建多维数组
  5. 一道关于Java并发的面试题
  6. python3.8新特性 逻辑表达式_Python3.8正式发布!新特性解析在这里
  7. C++ 简单的语音合成(TTS,即文字转语音)类
  8. POJ - 3734 Blocks 指数生成函数
  9. Ubuntu 14.04 安装 MongoDB
  10. 2015必须推荐的Android框架,猿必读系列!
  11. 消费者生产者代码之---一步一步带你写
  12. VS2017的内存分析
  13. adobe flash builder 4.6最新能用的序列号
  14. 3dmax全局材质灯光细分插件_【3D脚本插件】如何利用插件快速修改全局灯光材质细分...
  15. 小米路由器r2d_小米路由器二代R2D怎样设置无线中继模式
  16. SIMULINK模型自动生成Verilog代码
  17. Excel 2010 SQL应用117 分组统计之GROUP BY 与First
  18. 2022TGRS/云检测:用于遥感图像云检测的无监督域不变特征学习Unsupervised Domain-Invariant Feature Learning for Cloud Detection
  19. 图解系统(六)——调度算法
  20. 如何应对面试官问你职业规划问题

热门文章

  1. 《TensorFlow+Keras深度学习人工智能实践应用》林大贵著第六章第七章代码
  2. 第四届全国大学生计算机应用能力与信息素养大赛,全国大学生计算机应用能力与信息素养大赛...
  3. Unity AVPro video 开始播放,播放完成事件监听
  4. JAVA仿QQ登录界面
  5. 二是方便其“飞鸽下载”人看
  6. 计算机视觉中的不适定问题(ill-posed problem)
  7. OKCoin徐明星:区块链技术解读及应用实践
  8. 下载 packet tracer 出现Sorry, we can‘t find a NetAcad account associated with this Cisco account.
  9. 听计算机课评语与建议,听评课评语及建议
  10. ARM:嵌入式系统之硬件总复习