您的位置:首页 > 编程语言 > Java开发

【Spark Java API】Transformation(12)—zipPartitions、zip

2016-08-20 11:34 357 查看

zipPartitions

官方文档描述:

Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.


函数原型:

def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]


该函数将两个分区RDD按照partition进行合并,形成一个新的RDD。

源码分析:

def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD, preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
}

private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
preservesPartitioning: Boolean = false)
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
}

override def clearDependencies() {
super.clearDependencies()
rdd1 = null
rdd2 = null
f = null
}
}


[b]从源码中可以看出,zipPartitions函数生成ZippedPartitionsRDD2,该RDD继承ZippedPartitionsBaseRDD,在ZippedPartitionsBaseRDD中的getPartitions方法中判断需要组合的RDD是否具有相同的分区数,但是该RDD实现中并没有要求每个partitioner内的元素数量相同。


实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1);
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3);
JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() {
@Override
public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception {
LinkedList<String> linkedList = new LinkedList<String>();
while(integerIterator.hasNext() && integerIterator2.hasNext())
linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString());
return linkedList;
}
});
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());


zip

官方文档描述:

Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).


函数原型:

def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]


该函数用于将两个RDD进行组合,组合成一个key/value形式的RDD。

源码分析:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +          "same number of elements in each partition")
}
def next(): (T, U) = (thisIter.next(), otherIter.next())
}
}
}


从源码中可以看出,zip函数是基于zipPartitions实现的,其中preservesPartitioning为false,preservesPartitioning表示是否保留父RDD的partitioner分区;另外,两个RDD的partition数量及元数的数量都是相同的,否则会抛出异常。

实例:

List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7);
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1);
JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1);
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark