2019独角兽企业重金招聘Python工程师标准>>>

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。

TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。

TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。 修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代码打包,提交任务使用:

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

设置TensorFlowOnSpark根目录环境变量:

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

启动Spark主节点(master):

$(SPARK_HOME)/sbin/start-master.sh

配置两个工作节点(worker)实例,master-spark-URL连接主节点:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任务,MNIST zip文件转换为HDFS RDD 数据集:

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看处理过的数据集:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存图片、标记向量:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把训练集、测试集分别保存RDD数据。 https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):"""Serializes an image/label as a TFExample byte string"""example = tf.train.Example(features = tf.train.Features(feature = {'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))}))return example.SerializeToString()
def fromTFExample(bytestr):"""Deserializes a TFExample from a byte string"""example = tf.train.Example()example.ParseFromString(bytestr)return example
def toCSV(vec):"""Converts a vector/array into a CSV string"""return ','.join([str(i) for i in vec])
def fromCSV(s):"""Converts a CSV string to a vector/array"""return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):"""Writes MNIST image/label vectors into parallelized files on HDFS"""# load MNIST gzip into memory# MNIST图像、标记向量写入HDFSwith open(input_images, 'rb') as f:images = numpy.array(mnist.extract_images(f))with open(input_labels, 'rb') as f:if format == "csv2":labels = numpy.array(mnist.extract_labels(f, one_hot=False))else:labels = numpy.array(mnist.extract_labels(f, one_hot=True))shape = images.shapeprint("images.shape: {0}".format(shape))          # 60000 x 28 x 28print("labels.shape: {0}".format(labels.shape))   # 60000 x 10# create RDDs of vectorsimageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)labelRDD = sc.parallelize(labels, num_partitions)output_images = output + "/images"output_labels = output + "/labels"# save RDDs as specific format# RDDs保存特定格式if format == "pickle":imageRDD.saveAsPickleFile(output_images)labelRDD.saveAsPickleFile(output_labels)elif format == "csv":imageRDD.map(toCSV).saveAsTextFile(output_images)labelRDD.map(toCSV).saveAsTextFile(output_labels)elif format == "csv2":imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)else: # format == "tfr":tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")
#  Note: this creates TFRecord files w/o requiring a custom Input/Output format
#  else: # format == "tfr":
#    def writeTFRecords(index, iter):
#      output_path = "{0}/part-{1:05d}".format(output, index)
#      writer = tf.python_io.TFRecordWriter(output_path)
#      for example in iter:
#        writer.write(example)
#      return [output_path]
#    tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
#    tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):"""Reads/verifies previously created output"""output_images = output + "/images"output_labels = output + "/labels"imageRDD = NonelabelRDD = Noneif format == "pickle":imageRDD = sc.pickleFile(output_images)labelRDD = sc.pickleFile(output_labels)elif format == "csv":imageRDD = sc.textFile(output_images).map(fromCSV)labelRDD = sc.textFile(output_labels).map(fromCSV)else: # format.startswith("tf"):# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))num_images = imageRDD.count()num_labels = labelRDD.count() if labelRDD is not None else num_imagessamples = imageRDD.take(10)print("num_images: ", num_images)print("num_labels: ", num_labels)print("samples: ", samples)
if __name__ == "__main__":import argparsefrom pyspark.context import SparkContextfrom pyspark.conf import SparkConfparser = argparse.ArgumentParser()parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args() print("args:",args) sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize")) if not args.read: # Note: these files are inside the mnist.zip file writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions) writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions) if args.read or args.verify: readMNIST(sc, args.output + "/train", args.format)

提交训练任务,开始训练,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode调用tfspark.zip TFNode.py文件。

mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")def toNumpy(bytestr):example = tf.train.Example()example.ParseFromString(bytestr)features = example.features.featureimage = numpy.array(features['image'].int64_list.value)label = numpy.array(features['label'].int64_list.value)return (image, label)dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:if args.format == "csv":images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])else: # args.format == "pickle":images = sc.pickleFile(args.images)labels = sc.pickleFile(args.labels)print("zipping images and labels")dataRDD = images.zip(labels)
#1.为在Executor执行每个TensorFlow进程保留一个端口
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.启动Tensorflow主函数
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":#3.训练cluster.train(dataRDD, args.epochs)
else:#3.预测labelRDD = cluster.inference(dataRDD)labelRDD.saveAsTextFile(args.output)
#4.关闭Executor TensorFlow计算节点、参数服务节点
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

预测命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。

参考资料: 《TensorFlow技术解析与实战》

欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi

转载于:https://my.oschina.net/u/3482787/blog/1572538

学习笔记TF065:TensorFlowOnSpark相关推荐

  1. 学习笔记TF065:TensorFlowOnSpark 1

    Hadoop生态大数据系统分为Yam. HDFS.MapReduce计算框架.TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统.TensorFlow ...

  2. PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 call

    您的位置 首页 PyTorch 学习笔记系列 PyTorch 学习笔记(六):PyTorch hook 和关于 PyTorch backward 过程的理解 发布: 2017年8月4日 7,195阅读 ...

  3. 容器云原生DevOps学习笔记——第三期:从零搭建CI/CD系统标准化交付流程

    暑期实习期间,所在的技术中台-效能研发团队规划设计并结合公司开源协同实现符合DevOps理念的研发工具平台,实现研发过程自动化.标准化: 实习期间对DevOps的理解一直懵懵懂懂,最近观看了阿里专家带 ...

  4. 容器云原生DevOps学习笔记——第二期:如何快速高质量的应用容器化迁移

    暑期实习期间,所在的技术中台-效能研发团队规划设计并结合公司开源协同实现符合DevOps理念的研发工具平台,实现研发过程自动化.标准化: 实习期间对DevOps的理解一直懵懵懂懂,最近观看了阿里专家带 ...

  5. 2020年Yann Lecun深度学习笔记(下)

    2020年Yann Lecun深度学习笔记(下)

  6. 2020年Yann Lecun深度学习笔记(上)

    2020年Yann Lecun深度学习笔记(上)

  7. 知识图谱学习笔记(1)

    知识图谱学习笔记第一部分,包含RDF介绍,以及Jena RDF API使用 知识图谱的基石:RDF RDF(Resource Description Framework),即资源描述框架,其本质是一个 ...

  8. 计算机基础知识第十讲,计算机文化基础(第十讲)学习笔记

    计算机文化基础(第十讲)学习笔记 采样和量化PictureElement Pixel(像素)(链接: 采样的实质就是要用多少点(这个点我们叫像素)来描述一张图像,比如,一幅420x570的图像,就表示 ...

  9. Go 学习推荐 —(Go by example 中文版、Go 构建 Web 应用、Go 学习笔记、Golang常见错误、Go 语言四十二章经、Go 语言高级编程)

    Go by example 中文版 Go 构建 Web 应用 Go 学习笔记:无痕 Go 标准库中文文档 Golang开发新手常犯的50个错误 50 Shades of Go: Traps, Gotc ...

最新文章

  1. ES failed to notify ClusterStateListener java.lang.IllegalStateException: environment is not locked
  2. Bitcoin Core P2P网络层
  3. [置顶] 2014年八大最热门IT技能
  4. libjpeg的问题
  5. 线程的常用方法——currentThread方法||在main方法中直接调用run()方法,没有开启新的线程,以在run方法中的当前线程就是main线程||启动子线程,子线程会调用run方法
  6. 算法工程师如何改进豆瓣电影
  7. 网络——提交表单数据(post方式)
  8. 【CSS】font样式简写(转)- 不是很建议简写
  9. ssrf漏洞内网渗透_渗透技巧之SSRF
  10. pandas切片操作
  11. Android学习笔记(三)——Button之圆角、按压效果、点击事件
  12. ftk学习记(多窗口篇)
  13. AppThinning之AppSlicing
  14. 数组中的键值对去重_数组去重-
  15. 函数连续的概念与性质(包括强制函数)
  16. html5做宠物饲养,说一说最适合上班族养的十大宠物
  17. 经过JDBC毗邻oracle数据库的十年夜才干
  18. Intel Quiet System Technology (QST) Support In ...
  19. Topology拓扑_5:拓扑编辑器
  20. java环境变量含义_java环境变量的配置及各环境变量的含义

热门文章

  1. phonegap调用摄像头
  2. 转【红帽GFS集群文件系统配置指南】
  3. liunx查看python的site-packages路径
  4. DataGrid连接Access的快速分页法(1)——需求与现状
  5. hessiancpp编译和使用(C++版)
  6. FFmpeg在Windows上设置dshow mjpeg编码+libyuv解码显示测试代码
  7. C++/C++11中头文件functional的使用
  8. 运动目标检测__光流法
  9. 【Qt】在QtCreator中使用Ctrl+Shift+f快捷键打开高级查找窗口失效的解决方法
  10. 【linux命令】setterm控制终端属性命令(中英文)