【Spark Java API】Transformation(12)—zipPartitions、zip
2016-08-20 11:34
357 查看
zipPartitions
官方文档描述:
Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.
函数原型:
def zipPartitions[U, V]( other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]
该函数将两个分区RDD按照partition进行合并,形成一个新的RDD。
源码分析:
def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD, preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) } private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, var f: (Iterator[A], Iterator[B]) => Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) { override def compute(s: Partition, context: TaskContext): Iterator[V] = { val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context)) } override def clearDependencies() { super.clearDependencies() rdd1 = null rdd2 = null f = null } }
[b]从源码中可以看出,zipPartitions函数生成ZippedPartitionsRDD2,该RDD继承ZippedPartitionsBaseRDD,在ZippedPartitionsBaseRDD中的getPartitions方法中判断需要组合的RDD是否具有相同的分区数,但是该RDD实现中并没有要求每个partitioner内的元素数量相同。
实例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3); JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() { @Override public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception { LinkedList<String> linkedList = new LinkedList<String>(); while(integerIterator.hasNext() && integerIterator2.hasNext()) linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString()); return linkedList; } }); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());
zip
官方文档描述:
Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).
函数原型:
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]
该函数用于将两个RDD进行组合,组合成一个key/value形式的RDD。
源码分析:
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false case _ => throw new SparkException("Can only zip RDDs with " + "same number of elements in each partition") } def next(): (T, U) = (thisIter.next(), otherIter.next()) } } }
从源码中可以看出,zip函数是基于zipPartitions实现的,其中preservesPartitioning为false,preservesPartitioning表示是否保留父RDD的partitioner分区;另外,两个RDD的partition数量及元数的数量都是相同的,否则会抛出异常。
实例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述