学习笔记TF065:TensorFlowOnSpark
2017-11-13 08:58
666 查看
Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。
TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。
TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。
修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:
源代码打包,提交任务使用:
设置TensorFlowOnSpark根目录环境变量:
启动Spark主节点(master):
配置两个工作节点(worker)实例,master-spark-URL连接主节点:
提交任务,MNIST zip文件转换为HDFS RDD 数据集:
查看处理过的数据集:
查看保存图片、标记向量:
把训练集、测试集分别保存RDD数据。
https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。
args = parser.parse_args()
print(“args:”,args)
sc = SparkContext(conf=SparkConf().setAppName(“mnist_parallelize”))
if not args.read:
# Note: these files are inside the mnist.zip file
writeMNIST(sc, “mnist/train-images-idx3-ubyte.gz”, “mnist/train-labels-idx1-ubyte.gz”, args.output + “/train”, args.format, args.num_partitions)
writeMNIST(sc, “mnist/t10k-images-idx3-ubyte.gz”, “mnist/t10k-labels-idx1-ubyte.gz”, args.output + “/test”, args.format, args.num_partitions)
if args.read or args.verify:
readMNIST(sc, args.output + “/train”, args.format)
提交训练任务,开始训练,在HDFS生成mnist_model,命令:
mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:
TFNode调用tfspark.zip TFNode.py文件。
mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:
预测命令:
还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。
参考资料:
《TensorFlow技术解析与实战》
欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi
TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。
TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。
修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:
git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git cd TensorFlowOnSpark git submodule init git submodule update --force git submodule foreach --recursive git clean -dfx
源代码打包,提交任务使用:
cd TensorflowOnSpark/src zip -r ../tfspark.zip *
设置TensorFlowOnSpark根目录环境变量:
cd TensorFlowOnSpark export TFoS_HOME=$(pwd)
启动Spark主节点(master):
$(SPARK_HOME)/sbin/start-master.sh
配置两个工作节点(worker)实例,master-spark-URL连接主节点:
export MASTER=spark://$(hostname):7077 export SPARK_WORKER_INSTANCES=2 export CORES_PER_WORKER=1 export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES))) $(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)
提交任务,MNIST zip文件转换为HDFS RDD 数据集:
$(SPARK_HOME)/bin/spark-submit \ --master $(MASTER) --conf spark.ui.port=4048 --verbose \ $(TFoS_HOME)/examples/mnist/mnist_data_setup.py \ --output examples/mnist/csv \ --format csv
查看处理过的数据集:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv
查看保存图片、标记向量:
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels
把训练集、测试集分别保存RDD数据。
https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。
from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy import tensorflow as tf from array import array from tensorflow.contrib.learn.python.learn.datasets import mnist def toTFExample(image, label): """Serializes an image/label as a TFExample byte string""" example = tf.train.Example( features = tf.train.Features( feature = { 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))), 'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64"))) } ) ) return example.SerializeToString() def fromTFExample(bytestr): """Deserializes a TFExample from a byte string""" example = tf.train.Example() example.ParseFromString(bytestr) return example def toCSV(vec): """Converts a vector/array into a CSV string""" return ','.join([str(i) for i in vec]) def fromCSV(s): """Converts a CSV string to a vector/array""" return [float(x) for x in s.split(',') if len(s) > 0] def writeMNIST(sc, input_images, input_labels, output, format, num_partitions): """Writes MNIST image/label vectors into parallelized files on HDFS""" # load MNIST gzip into memory # MNIST图像、标记向量写入HDFS with open(input_images, 'rb') as f: images = numpy.array(mnist.extract_images(f)) with open(input_labels, 'rb') as f: if format == "csv2": labels = numpy.array(mnist.extract_labels(f, one_hot=False)) else: labels = numpy.array(mnist.extract_labels(f, one_hot=True)) shape = images.shape print("images.shape: {0}".format(shape)) # 60000 x 28 x 28 print("labels.shape: {0}".format(labels.shape)) # 60000 x 10 # create RDDs of vectors imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions) labelRDD = sc.parallelize(labels, num_partitions) output_images = output + "/images" output_labels = output + "/labels" # save RDDs as specific format # RDDs保存特定格式 if format == "pickle": imageRDD.saveAsPickleFile(output_images) labelRDD.saveAsPickleFile(output_labels) elif format == "csv": imageRDD.map(toCSV).saveAsTextFile(output_images) labelRDD.map(toCSV).saveAsTextFile(output_labels) elif format == "csv2": imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output) else: # format == "tfr": tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None)) # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") # Note: this creates TFRecord files w/o requiring a custom Input/Output format # else: # format == "tfr": # def writeTFRecords(index, iter): # output_path = "{0}/part-{1:05d}".format(output, index) # writer = tf.python_io.TFRecordWriter(output_path) # for example in iter: # writer.write(example) # return [output_path] # tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1])) # tfRDD.mapPartitionsWithIndex(writeTFRecords).collect() def readMNIST(sc, output, format): """Reads/verifies previously created output""" output_images = output + "/images" output_labels = output + "/labels" imageRDD = None labelRDD = None if format == "pickle": imageRDD = sc.pickleFile(output_images) labelRDD = sc.pickleFile(output_labels) elif format == "csv": imageRDD = sc.textFile(output_images).map(fromCSV) labelRDD = sc.textFile(output_labels).map(fromCSV) else: # format.startswith("tf"): # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWrit 4000 able") imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0]))) num_images = imageRDD.count() num_labels = labelRDD.count() if labelRDD is not None else num_images samples = imageRDD.take(10) print("num_images: ", num_images) print("num_labels: ", num_labels) print("samples: ", samples) if __name__ == "__main__": import argparse from pyspark.context import SparkContext from pyspark.conf import SparkConf parser = argparse.ArgumentParser() parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv") parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10) parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data") parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true") parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
args = parser.parse_args()
print(“args:”,args)
sc = SparkContext(conf=SparkConf().setAppName(“mnist_parallelize”))
if not args.read:
# Note: these files are inside the mnist.zip file
writeMNIST(sc, “mnist/train-images-idx3-ubyte.gz”, “mnist/train-labels-idx1-ubyte.gz”, args.output + “/train”, args.format, args.num_partitions)
writeMNIST(sc, “mnist/t10k-images-idx3-ubyte.gz”, “mnist/t10k-labels-idx1-ubyte.gz”, args.output + “/test”, args.format, args.num_partitions)
if args.read or args.verify:
readMNIST(sc, args.output + “/train”, args.format)
提交训练任务,开始训练,在HDFS生成mnist_model,命令:
${SPARK_HOME}/bin/spark-submit \ --master ${MASTER} \ --py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \ --conf spark.cores.max=${TOTAL_CORES} \ --conf spark.task.cpus=${CORES_PER_WORKER} \ --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \ ${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \ --cluster_size ${SPARK_WORKER_INSTANCES} \ --images examples/mnist/csv/train/images \ --labels examples/mnist/csv/train/labels \ --format csv \ --mode train \ --model mnist_model
mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
TFNode调用tfspark.zip TFNode.py文件。
mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:
from __future__ import absolute_import from __future__ import division from __future__ import print_function from pyspark.context import SparkContext from pyspark.conf import SparkConf import argparse import os import numpy import sys import tensorflow as tf import threading import time from datetime import datetime from tensorflowonspark import TFCluster import mnist_dist sc = SparkContext(conf=SparkConf().setAppName("mnist_spark")) executors = sc._conf.get("spark.executor.instances") num_executors = int(executors) if executors is not None else 1 num_ps = 1 parser = argparse.ArgumentParser() parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1) parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv") parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model") parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions") parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1) parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) args = parser.parse_args() print("args:",args) print("{0} ===== Start".format(datetime.now().isoformat())) if args.format == "tfr": images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") def toNumpy(bytestr): example = tf.train.Example() example.ParseFromString(bytestr) features = example.features.feature image = numpy.array(features['image'].int64_list.value) label = numpy.array(features['label'].int64_list.value) return (image, label) dataRDD = images.map(lambda x: toNumpy(str(x[0]))) else: if args.format == "csv": images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')]) labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')]) else: # args.format == "pickle": images = sc.pickleFile(args.images) labels = sc.pickleFile(args.labels) print("zipping images and labels") dataRDD = images.zip(labels) #1.为在Executor执行每个TensorFlow进程保留一个端口 cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) #2.启动Tensorflow主函数 cluster.start(mnist_dist.map_fun, args) if args.mode == "train": #3.训练 cluster.train(dataRDD, args.epochs) else: #3.预测 labelRDD = cluster.inference(dataRDD) labelRDD.saveAsTextFile(args.output) #4.关闭Executor TensorFlow计算节点、参数服务节点 cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat()))
预测命令:
${SPARK_HOME}/bin/spark-submit \ --master ${MASTER} \ --py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \ --conf spark.cores.max=${TOTAL_CORES} \ --conf spark.task.cpus=${CORES_PER_WORKER} \ --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \ ${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \ --cluster_size ${SPARK_WORKER_INSTANCES} \ --images examples/mnist/csv/test/images \ --labels examples/mnist/csv/test/labels \ --mode inference \ --format csv \ --model mnist_model \ --output predictions
还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。
参考资料:
《TensorFlow技术解析与实战》
欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi
相关文章推荐
- 学习笔记TF065:TensorFlowOnSpark
- 学习笔记TF067:TensorFlow Serving、Flod、计算加速,机器学习评测体系,公开数据集
- 学习笔记TF067:TensorFlow Serving、Flod、计算加速,机器学习评测体系,公开数据集
- 分布式深度学习框架TensorFlowOnSpark
- 学习笔记TF053:循环神经网络,TensorFlow Model Zoo,强化学习,深度森林,深度学习艺术
- 学习笔记TF057:TensorFlow MNIST,卷积神经网络、循环神经网络、无监督学习
- 学习笔记TF057:TensorFlow MNIST,卷积神经网络、循环神经网络、无监督学习
- Spark新愿景:让深度学习变得更加易于使用——见https://github.com/yahoo/TensorFlowOnSpark
- 【深度学习笔记】(三)Tensorflow on Android
- 学习笔记TF056:TensorFlow MNIST,数据集、分类、可视化
- 深度学习笔记5-TensorFlow Dropout
- TensorFlow学习笔记(十二)TensorFLow tensorBoard 总结
- TensorflowOnSpark-单机版
- [翻译]斯坦福CS20SI:基于Tensorflow的深度学习研究课程笔记,Lecture note 1: Introduction to TensorFlow
- 学习笔记1:深度学习环境搭建win+python+tensorflow1.5+CUDA9.0+cuDNN7.0
- 【tensorTensorFlow学习笔记】Mac+Linux虚拟机安装TensorFlow
- TensorflowOnSpark:1)Standalone集群初体验
- tensorflow google实战 学习笔记——TensorFlow入门(1)
- Keras+tensorflow学习笔记
- 生成对抗网络DCGAN+Tensorflow代码学习笔记(二)----utils.py