在使用TensorFlow进行异步计算时,队列是一种强大的机制。

正如TensorFlow中的其他组件一样,队列就是TensorFlow图中的节点。这是一种有状态的节点,就像变量一样:其他节点可以修改它的内容。具体来说,其他节点可以把新元素插入到队列后端(rear),也可以把队列前端(front)的元素删除。

为了感受一下队列,让我们来看一个简单的例子。我们先创建一个“先入先出”的队列(FIFOQueue),并将其内部所有元素初始化为零。然后,我们构建一个TensorFlow图,它从队列前端取走一个元素,加上1之后,放回队列的后端。慢慢地,队列的元素的值就会增加。

Enqueue 、  EnqueueManyDequeue 都是特殊的节点。他们需要获取队列指针,而非普通的值,如此才能修改队列内容。我们建议您将它们看作队列的方法。事实上,在Python API中,它们就是队列对象的方法(例如 q.enqueue(...) )。

现在你已经对队列有了一定的了解,让我们深入到细节...

队列使用概述

队列,如FIFOQueueRandomShuffleQueue,在TensorFlow的张量异步计算时都非常重要。

例如,一个典型的输入结构:是使用一个RandomShuffleQueue来作为模型训练的输入:

  • 多个线程准备训练样本,并且把这些样本推入队列。
  • 一个训练线程执行一个训练操作,此操作会从队列中移除最小批次的样本(mini-batches)。

这种结构具有许多优点,正如在Reading data how to中强调的,同时,Reading data how to也概括地描述了如何简化输入管道的构造过程。

TensorFlow的Session对象是可以支持多线程的,因此多个线程可以很方便地使用同一个会话(Session)并且并行地执行操作。然而,在Python程序实现这样的并行运算却并不容易。所有线程都必须能被同步终止,异常必须能被正确捕获并报告,回话终止的时候, 队列必须能被正确地关闭。

所幸TensorFlow提供了两个类来帮助多线程的实现:tf.Coordinator和 tf.QueueRunner。从设计上这两个类必须被一起使用。Coordinator类可以用来同时停止多个工作线程并且向那个在等待所有工作线程终止的程序报告异常。QueueRunner类用来协调多个工作线程同时将多个张量推入同一个队列中。

Coordinator

Coordinator类用来帮助多个线程协同工作,多个线程同步终止。 其主要方法有:

  • should_stop():如果线程应该停止则返回True。
  • request_stop(<exception>): 请求该线程停止。
  • join(<list of threads>):等待被指定的线程终止。

首先创建一个Coordinator对象,然后建立一些使用Coordinator对象的线程。这些线程通常一直循环运行,一直到should_stop()返回True时停止。 任何线程都可以决定计算什么时候应该停止。它只需要调用request_stop(),同时其他线程的should_stop()将会返回True,然后都停下来。

# 线程体:循环执行,直到`Coordinator`收到了停止请求。
# 如果某些条件为真,请求`Coordinator`去停止其他线程。
def MyLoop(coord):while not coord.should_stop():...do something...if ...some condition...:coord.request_stop()# Main code: create a coordinator.
coord = Coordinator()# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord)) for i in xrange(10)]# Start the threads and wait for all of them to stop.
for t in threads: t.start()
coord.join(threads)

显然,Coordinator可以管理线程去做不同的事情。上面的代码只是一个简单的例子,在设计实现的时候不必完全照搬。Coordinator还支持捕捉和报告异常, 具体可以参考Coordinator class的文档。

QueueRunner

QueueRunner类会创建一组线程, 这些线程可以重复的执行Enquene操作, 他们使用同一个Coordinator来处理线程同步终止。此外,一个QueueRunner会运行一个closer thread,当Coordinator收到异常报告时,这个closer thread会自动关闭队列。

您可以使用一个queue runner,来实现上述结构。 首先建立一个TensorFlow图表,这个图表使用队列来输入样本。增加处理样本并将样本推入队列中的操作。增加training操作来移除队列中的样本。

example = ...ops to create one example...
# Create a queue, and an op that enqueues examples one at a time in the queue.
queue = tf.RandomShuffleQueue(...)
enqueue_op = queue.enqueue(example)
# Create a training graph that starts by dequeuing a batch of examples.
inputs = queue.dequeue_many(batch_size)
train_op = ...use 'inputs' to build the training part of the graph...

在Python的训练程序中,创建一个QueueRunner来运行几个线程, 这几个线程处理样本,并且将样本推入队列。创建一个Coordinator,让queue runner使用Coordinator来启动这些线程,创建一个训练的循环, 并且使用Coordinator来控制QueueRunner的线程们的终止。

# Create a queue runner that will run 4 threads in parallel to enqueue
# examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
# Run the training loop, controlling termination with the coordinator.
for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(threads)

异常处理

通过queue runners启动的线程不仅仅只处理推送样本到队列。他们还捕捉和处理由队列产生的异常,包括OutOfRangeError异常,这个异常是用于报告队列被关闭。 使用Coordinator的训练程序在主循环中必须同时捕捉和报告异常。 下面是对上面训练循环的改进版本。

try:for step in xrange(1000000):if coord.should_stop():breaksess.run(train_op)
except Exception, e:# Report exceptions to the coordinator.coord.request_stop(e)# Terminate as usual.  It is innocuous to request stop twice.
coord.request_stop()
coord.join(threads)

TensorFlow 教程 --进阶指南--3.5线程和队列相关推荐

  1. TensorFlow 教程 --进阶指南--3.1总览

    综述 Overview Variables: 创建,初始化,保存,和恢复 TensorFlow Variables 是内存中的容纳 tensor 的缓存.这一小节介绍了用它们在模型训练时(during ...

  2. TensorFlow 教程 --进阶指南--3.4数据读取

    数据读取 TensorFlow程序读取数据一共有3种方法: 供给数据(Feeding): 在TensorFlow程序运行的每一步, 让Python代码来供给数据. 从文件读取数据: 在TensorFl ...

  3. TensorFlow 教程 --进阶指南--3.10共享变量

    你可以在怎么使用变量中所描述的方式来创建,初始化,保存及加载单一的变量.但是当创建复杂的模块时,通常你需要共享大量变量集并且如果你还想在同一个地方初始化这所有的变量,我们又该怎么做呢.本教程就是演示如 ...

  4. TensorFlow 教程 --进阶指南--3.7自定义数据读取

    基本要求: 熟悉 C++ 编程. 确保下载 TensorFlow 源文件, 并可编译使用. 我们将支持文件格式的任务分成两部分: 文件格式: 我们使用 Reader Op来从文件中读取一个 recor ...

  5. TensorFlow 教程 --进阶指南--3.9TensorBoard: 图表可视化

    TensorFlow 图表计算强大而又复杂,图表可视化在理解和调试时显得非常有帮助. 下面是一个运作时的可式化例子. "一个TensorFlow图表的可视化") 一个TensorF ...

  6. TensorFlow 教程 --进阶指南--3.8使用GPU

    支持的设备 在一套标准的系统上通常有多个计算设备. TensorFlow 支持 CPU 和 GPU 这两种设备. 我们用指定字符串strings 来标识这些设备. 比如: "/cpu:0&q ...

  7. TensorFlow 教程 --进阶指南--3.6增加一个新 Op

    预备知识: 对 C++ 有一定了解. 已经下载 TensorFlow 源代码并有能力编译它. 如果现有的库没有涵盖你想要的操作, 你可以自己定制一个. 为了使定制的 Op 能够兼容原有的库 , 你必须 ...

  8. TensorFlow 教程 --进阶指南--3.3可视化学习

    TensorBoard:可视化学习 TensorBoard 涉及到的运算,通常是在训练庞大的深度神经网络中出现的复杂而又难以理解的运算. 为了更方便 TensorFlow 程序的理解.调试与优化,我们 ...

  9. TensorFlow 教程 --进阶指南--3.2变量:创建、初始化、保存和加载

    变量:创建.初始化.保存和加载 当训练模型时,用变量来存储和更新参数.变量包含张量 (Tensor)存放于内存的缓存区.建模时它们需要被明确地初始化,模型训练后它们必须被存储到磁盘.这些变量的值可在之 ...

最新文章

  1. 比特币寒冬中,你更应该关注企业区块链!
  2. PTA 基础编程题目集 6-7 统计某类完全平方数 C语言
  3. 谷歌DeepMind联合创始人被架空
  4. SQL优化|Java面试题
  5. 【FI】-【AM】跨工厂/事业部/利润中心的固定资产调拨
  6. 【概率论】1-2:计数方法(Counting Methods)
  7. 前端学习(2472):发布文章功能介绍
  8. 有了解过Elasticsearch的性化搜索方案吗?
  9. 51nod1255【贪心-栈的应用】
  10. jenkins--master/slave模式---master是容器版---slave是非容器版
  11. 学习笔记-极客时间 玩转 git 三剑客 课程记录
  12. 【主流Nivida显卡深度学习/强化学习/AI算力汇总】
  13. win7如何设置通电自动开机_win7系统如何自动开机|win7自动开机的方法
  14. Python连接MySQL数据库locahost无法连接的问题
  15. cms三次标记浮动垃圾是如何产生的
  16. 员工信息表,增删查改程序
  17. 插入排序超详细讲解C语言
  18. 华为鸿蒙第一期公测,华为鸿蒙开启第二轮公测,新增7款机型,有你的吗?
  19. Flink 1.12.2 源码浅析 : JobGraph
  20. 如何让外网访问本地WEB服务器

热门文章

  1. 【C语言】----宏定义,预处理宏
  2. 【c语言】指针数组和数组指针-解释和用法
  3. 【LeetCode】剑指 Offer 44. 数字序列中某一位的数字
  4. 微信小程序|开发实战篇之九-image-picker图片选择器组件及其子组件
  5. jenkins pipline 用法收集
  6. SSL证书检查吊销状态
  7. linux设备驱动归纳总结(八):1.总线、设备和驱动【转】
  8. Powershell实例小结(服务管理)
  9. property属性[Python]
  10. windows安装ruby on rails