Queues, Threads, and Reading Data

输入管线

如果训练数据量较小,Tesnsorflow会把数据一次性加载到内存当中。如果数据过于庞大,Tesorflow需要把存储到硬盘中的数据进行有关处理,分批次加载数据块到内存当中。这些硬盘中的数据可能需要格式化、乱序等的操作。当数据很大的时候,训练过程与处理数据的过程需要用多线程操作,以节约时间。为了使输入输出更加高效,Tensorflow有自己专用的数据格式,处理到大批次数据时,最好转化成专有格式。

TFRecord

TFRecord是一种二进制数据文件,它忽略掉原来的输入数据格式(图片、语音、文字等),统一它们的格式,并把数据进行序列化处理。序列化操作是基于协议缓冲区(protobuf)的,这与平台、编程语言等无关。以下内容主要来自这篇博客,需要使用Google才能访问,在这里就是翻译一下。
第一部分讲述了怎样使用Numpy获取原生数据;第二部分讲述了在没有图模型的情况下,仅仅借助于Tensorflow的内置函数,把数据集转换成TFRecord格式;第三部分讲述了定义一个用于读取二进制文件的模型,并且以随机的方式进行输入。

使用Numpy获取原始数据

在这里我们把图片转化成原始数据,并恢复数据,并检查恢复后的图片是否与原图片一致

# 注意下面的代码在Tensorflow可能无法运行,我这里无法通过
# 就当是一个举例吧
import numpy as np
import skimage.io as iocat_img = io.imread('cat.jpg')
io.imshow(cat_img)

图片显示:

# 使用 ndarray.tostring() 函数, 把图片转化成string编码
cat_string = cat_img.tostring()# 把string转化回原来的图片,注意:dtype需要显式地声明,否则报错
# 重构图片的是一维类型,我们需要整个图片的尺寸来恢复原始图片
reconstructed_cat_1d = np.fromstring(cat_string, dtype=np.uint8)# 在这里重新塑造图片形状
reconstructed_cat_img = reconstructed_cat_1d.reshape(cat_img.shape)# 检测两个图片是否一致
print(np.allclose(cat_img, reconstructed_cat_img))
# 结果输出True

创建一个.tfrecord文件,并且在不用图模型的情况下读取

在使用之前,需要了解一下protocol buffer的定义,至少要有一个了解,否则有很多细节的东西不能很好的理解。给出官网:https://developers.google.com/protocol-buffers/ (需要Google)
tf.train.Example的定义:

syntax = "proto3";import "tensorflow/core/example/feature.proto";
option cc_enable_arenas = true;
option java_outer_classname = "ExampleProtos";
option java_multiple_files = true;
option java_package = "org.tensorflow.example";package tensorflow;message Example {Features features = 1;
};message SequenceExample {Features context = 1;FeatureLists feature_lists = 2;
};

tf.train.Features的定义:

syntax = "proto3";
option cc_enable_arenas = true;
option java_outer_classname = "FeatureProtos";
option java_multiple_files = true;
option java_package = "org.tensorflow.example";package tensorflow;// Containers to hold repeated fundamental values.
message BytesList {repeated bytes value = 1;
}
message FloatList {repeated float value = 1 [packed = true];
}
message Int64List {repeated int64 value = 1 [packed = true];
}// Containers for non-sequential data.
message Feature {// Each feature can be exactly one kind.oneof kind {BytesList bytes_list = 1;FloatList float_list = 2;Int64List int64_list = 3;}
};message Features {// Map from feature name to feature.map<string, Feature> feature = 1;
};
message FeatureList {repeated Feature feature = 1;
};message FeatureLists {// Map from feature name to feature list.map<string, FeatureList> feature_list = 1;
};

以上是基于protocol buffer协议的一个数据格式定义, 这是使用TFRecord数据格式的基础。
下面,我们将展示在没有图模型参与的情况下的小批量数据写入和读取操作。

# 定义数据的路径
filename_pairs = [
('/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/JPEGImages/2007_000032.jpg',
'/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/SegmentationClass/2007_000032.png'),
('/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/JPEGImages/2007_000039.jpg',
'/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/SegmentationClass/2007_000039.png'),
('/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/JPEGImages/2007_000063.jpg',
'/home/dpakhom1/tf_projects/segmentation/VOCdevkit/VOCdevkit/VOC2012/SegmentationClass/2007_000063.png')]from PIL import Image
import numpy as np
import skimage.io as io
import tensorflow as tf# 定义的两个特征写入函数
def _bytes_feature(value):return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))def _int64_feature(value):return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))# 写入的文件名
tfrecords_filename = 'pascal_voc_segmentation.tfrecords'# 定义的TFRecordWriter
writer = tf.python_io.TFRecordWriter(tfrecords_filename)# 用于保存图片的初始信息
original_images = []# 对于路径中的每一个图片,img_path存储路径,annotation_path存储图片编号
for img_path, annotation_path in filename_pairs:# 转换成Numpy数组的格式img = np.array(Image.open(img_path))annotation = np.array(Image.open(annotation_path))# 保存原始图片的高度和宽度height = img.shape[0]width = img.shape[1]# 把数据追加到原始数组中original_images.append((img, annotation))# 转换成string类型img_raw = img.tostring()annotation_raw = annotation.tostring()# 确定protocol buffer的格式,用于写入`.tfrecord`文件example = tf.train.Example(features=tf.train.Features(feature={'height': _int64_feature(height),'width': _int64_feature(width),'image_raw': _bytes_feature(img_raw),'mask_raw': _bytes_feature(annotation_raw)}))# 数据序列化之后写入writer.write(example.SerializeToString())
writer.close()
# 用于存储重构图片的数据
reconstructed_images = []
# 确定存储的路径
record_iterator = tf.python_io.tf_record_iterator(path=tfrecords_filename)for string_record in record_iterator:# 获取数据,example = tf.train.Example()example.ParseFromString(string_record)height = int(example.features.feature['height'].int64_list.value[0])width = int(example.features.feature['width'].int64_list.value[0])img_string = (example.features.feature['image_raw'].bytes_list.value[0])annotation_string = (example.features.feature['mask_raw'].bytes_list.value[0])# 恢复数据成一维的img_1d = np.fromstring(img_string, dtype=np.uint8)二维展开图像reconstructed_img = img_1d.reshape((height, width, -1))# 获取路径annotation_1d = np.fromstring(annotation_string, dtype=np.uint8)# Annotations don't have depth (3rd dimension)reconstructed_annotation = annotation_1d.reshape((height, width))reconstructed_images.append((reconstructed_img, reconstructed_annotation))

最后以MNIST数据集为例子:

import tensorflow as tf
from tensorflow.contrib.learn.python.learn.datasets import mnist
import ossave_dir = "/home/erick/Desktop/mnist"# Download data to save_dir
data_sets = mnist.read_data_sets(save_dir,dtype=tf.uint8,reshape=False,validation_size=1000)data_splits = ["train", "test", "validation"]
for d in range(len(data_splits)):print("saving" + data_splits[d])data_set = data_sets[d]# 实例化TFRecordWriter对象,并确定写入的目录filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')writer = tf.python_io.TFRecordWriter(filename)# 遍历每个图片for index in range(data_set.images.shape[0]):# 把图片从Numpy数组转化成二进制字符串image = data_set.images[index].tostring()# 这是一个存储协议example = tf.train.Example(features=tf.train.Features(features={'height': tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[1]])),'width': tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[2]])),'depth': tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[3]])),'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[int(data_set.labels[index])])),'image_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image]))}))writer.write(example.SerializeToString)writer.close()

Queues

这里的队列指的是计算图中的队列,与一般队列不同,这里的队列需要有计算图的介入才有效。

Enqueuing and Dequeuing 入队和出队

入队和出队的操作需要提前声明,不过声明的完后不会运行,只有当图模型执行该操作后,定义的操作才会有效!不能对一个空队列执行出队操作,否则会导致主线程一直处于挂起状态。

import tensorflow as tfsess = tf.InteractiveSession()
# 建立容量是10 的string存储string的队列
queue1 = tf.FIFOQueue(capacity=10, dtypes=[tf.string])
# 入队操作
enque_op = queue1.enqueue(["F"])
print(sess.run(queue1.size()))  # 输出0,因为此时没有计算图执行该操作
enque_op.run()  # 计算图介入操作
print(sess.run(queue1.size()))  # 输出1# 继续执行入队的操作
enque_op = queue1.enqueue(["I"])
enque_op.run()
enque_op = queue1.enqueue(["F"])
enque_op.run()
enque_op = queue1.enqueue(["O"])
enque_op.run()print(sess.run(queue1.size()))  # 输出4x = queue1.dequeue()  # 出队操作
# 下面的出队操作最多执行4次,对空队列执行出队操作,会让主线程永远挂起!!
print(x.eval())  # 输出 F
print(x.eval())  # 输出 I
print(x.eval())  # 输出 F
print(x.eval())  # 输出 O

多个元素同时出队

可以使用dequeue_many()一次使多个元素出队,不过出队的个数不能超过队列的元素的个数,否则主线程会挂起。

import tensorflow as tfsess = tf.InteractiveSession()
# 建立容量是10的string存储string的队列,需要提前说明队列的类型
queue1 = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
# 入队操作
enque_op = queue1.enqueue(["F"])
print(sess.run(queue1.size()))  # 输出0
enque_op.run()  # 计算图介入操作
print(sess.run(queue1.size()))  # 输出1# 继续执行入队的操作
enque_op = queue1.enqueue(["I"])
enque_op.run()
enque_op = queue1.enqueue(["F"])
enque_op.run()
enque_op = queue1.enqueue(["O"])
enque_op.run()print(sess.run(queue1.size()))  # 输出4# 执行多次出队操作
inputs = queue1.dequeue_many(4)
print(inputs.eval())  # 输出 [b'F' b'I' b'F' b'O']

Threading and Queues 线程和队列

一个Tensorflow的session可以是多线程的,一个session可以同时并行执行多个操作。但是,如果一次调用sess.run()不能充分利用资源,可以利用多个并行调用增加吞吐量。一个典型的应用场景:执行图像识别算法训练时,一方面要把图像转换成标准的输入流格式,另一方面要用输入流对神经网络进行训练,很明显输入和训练的过程是可以并行执行的。
关于python多线程的知识,可以参考一下这几篇博客
但是,多线程使用的时候,必须同时停止;如果有异常,必须在所有线程结束后,捕获所有的异常;队列必须要在线程结束的时候适当的停止。Tensorflow提供了tf.train.Coordinatortf.train.QueueRunner这两个类在协助使用多线程技术。前者用来同时停止多个线程,或者等待所有线程结束后报告异常;后者用来创建一定数量的线程,这些线程写作让tensor加入同一个队列。

Coordinator

  • tf.train.Coordinator.should_stop: 返回True,如果线程需要停止
  • tf.train.Coordinator.request_stop:要求线程停止
  • tf.train.Coordinator.join: 一直等到特定的线程终止

我们首先建立一个Coordinator对象,之后创建一定数量的可以使用线程协调器的线程。当should_stop()返回True时,所有的线程循环停止。任何一个线程都可以决定计算图模型的停止时间,只要该线程调用request_stop()函数,那么其他的线程就会停止,且它们的shoule_stop()函数会返回True
官方文档给出的一般格式:

# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):while not coord.should_stop():...do something...if ...some condition...:coord.request_stop()# Main thread: create a coordinator.
coord = tf.train.Coordinator()# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]# Start the threads and wait for all of them to stop.
for t in threads:t.start()
coord.join(threads)

实例代码:

import tensorflow as tf
import threading, timedef loop(coord, i):while not coord.should_stop():# 输出线程编号和当前系统时间print(i, time.strftime('%H:%M:%S', time.localtime(time.time())))# 睡眠一秒,防止运行过快time.sleep(1)# 5号线程申请终止if i == 5:coord.request_stop()# 主线程
coord = tf.train.Coordinator()
# 申请10个线程
threads = [threading.Thread(target=loop, args=(coord, i)) for i in range(10)]# 启动所有线程,并等待线程结束
for t in threads:t.start()
coord.join(threads)

上述代码中,5号线程仅出现一次,并且在休眠一秒后,终止了所有的线程。

QueueRunner

QueueRunner创建了一定数量的线程,来重复的执行入队操作。
给出一般结构:

import tensorflow as tfgen_random_normal = tf.random_normal(shape=())
queue = tf.RandomShuffleQueue(capacity=100, dtypes=[tf.float32], min_after_dequeue=1)
enqueue_op = queue.enqueue(gen_random_normal)
# Create a queue runner that will run 4 threads in parallel to enqueue examples.
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)# Launch the graph.
sess = tf.Session()
# Create a coordinator, launch the queue runner threads.
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)# Run the training loop, controlling termination with the coordinator.
# for step in range(1000000):
#     if coord.should_stop():
#         break
#     sess.run(train_op)# When done, ask the threads to stop.
coord.request_stop()
# And wait for them to actually do it.
coord.join(enqueue_threads)

Tensorflow中的TFRecord、Queue和多线程相关推荐

  1. Tensorflow中使用tfrecord,队列方式读取数据

    博客地址:https://blog.csdn.net/liangjun_feng/article/details/79698809 标准TensorFlow格式 有一种保存记录的方法可以允许你讲任意的 ...

  2. 利用Tensorflow中的TFRecord生成与读取图片

    本人是深度学习的爱好者,最近在参加cv的表情识别训练,把一些步骤和过程记录下来 一.什么是TFRecord 对于计算机的内存来说,我们训练集样本的数目是巨大的,而且这些文件分散在不同的文件夹中,在存读 ...

  3. tensorflow中tfrecord数据操作

    前言: 为了更加展示tfrecord数据的相关操作,笔者后续又写了一个实践的简单例子进一步解释,具体可以看: TFrecords 制作数据集小例子(多标签)_爱吃火锅的博客-CSDN博客 正文: tf ...

  4. python读取图像数据流_浅谈TensorFlow中读取图像数据的三种方式

    本文面对三种常常遇到的情况,总结三种读取数据的方式,分别用于处理单张图片.大量图片,和TFRecorder读取方式.并且还补充了功能相近的tf函数. 1.处理单张图片 我们训练完模型之后,常常要用图片 ...

  5. 在TensorFlow中使用pipeline加载数据

    正文共2028个字,6张图,预计阅读时间6分钟. 前面对TensorFlow的多线程做了测试,接下来就利用多线程和Queue pipeline地加载数据.数据流如下图所示: 首先,A.B.C三个文件通 ...

  6. tensorflow中协调器 tf.train.Coordinator 和入队线程启动器 tf.train.start_queue_runners

    TensorFlow的Session对象是支持多线程的,可以在同一个会话(Session)中创建多个线程,并行执行.在Session中的所有线程都必须能被同步终止,异常必须能被正确捕获并报告,会话终止 ...

  7. TensorFlow 2.0 - TFRecord存储数据集、@tf.function图执行模式、tf.TensorArray、tf.config分配GPU

    文章目录 1. TFRecord 格式存储 2. tf.function 高性能 3. tf.TensorArray 支持计算图特性 4. tf.config 分配GPU 学习于:简单粗暴 Tenso ...

  8. python 多线程处理_Python中的多处理与多线程:您需要了解的内容。

    python 多线程处理 by Timber.io 由Timber.io Python中的多处理与多线程:您需要了解的内容. (Multiprocessing vs Multithreading in ...

  9. tensorflow 读取cifar_对tensorflow中cifar-10文档的Read操作详解

    前言 在tensorflow的官方文档中得卷积神经网络一章,有一个使用cifar-10图片数据集的实验,搭建卷积神经网络倒不难,但是那个cifar10_input文件着实让我费了一番心思.配合着官方文 ...

最新文章

  1. CB:南土所梁玉婷组-细菌群落的高稳定性和代谢能力促进了土壤中易分解碳的快速减少...
  2. 利用python安装opencv_Linux下安装OpenCV+Python支持
  3. iOS将产品进行多语言发布,开发
  4. 1.3 Quick Start中 Step 3: Create a topic官网剖析(博主推荐)
  5. 在IOS XR上配置BFD
  6. SWIG Python-C封装 char*相关问题(3)
  7. python专题-读取xml文件
  8. Kotlin基础知识
  9. mysql数据库查询减法计算_我对数据库关系代数中减法sql实现的思考:mysql脚本...
  10. [hiho 10]由前序中序遍历求后序遍历
  11. cefsharp.core.dll找不到指定模块_DeepFaceLab错误:DLL Load failed 找不到指定模块!
  12. 马哥python开发培训
  13. 推荐5款常用编程文本编辑器
  14. html5期末大作业课程设计仿苹果官网(源码+报告)
  15. java fx 教程_JavaFX快速入门
  16. arduino学习:本人编写的单个传感器控制电机运转的代码
  17. 2021-2027全球与中国工业锂电池市场现状及未来发展趋势
  18. 我的世界java版复仇双持_我的世界战备双持2mod整合包
  19. 不等式约束问题-KKT条件 (1)
  20. 物理学家眼中的世界:编程的未来

热门文章

  1. beetl调用java方法_08.自定义方法以及直接访问java类方法---《Beetl视频课程》
  2. Python之“可变”的tuple
  3. 编程实现 带符号加法溢出判断
  4. 进制转换 [2008年北京大学图形实验室计算机研究生机试真题]
  5. [贪心算法] 例6.2 今年暑假不AC
  6. [转]java 输出流转输入流
  7. TLE5012B ESP32驱动程序、硬件电路设计、4线SPI通信,驱动完美兼容4线SPI不用改MOSI开漏推挽输出
  8. Python中的list/tuple/dict/set数据类型详解
  9. 三元表达式三个条件判断
  10. mysql 准则 杂谈