【Spark Java API】Transformation(12)—zipPartitions、zip

2016-08-20



Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions. Assumes that all the RDDs have the same number of partitions, but does not require them to have the same number of elements in each partition.


def zipPartitions[U, V](
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V]



def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD, preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope {
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)

private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
sc: SparkContext,
var f: (Iterator[A], Iterator[B]) => Iterator[V],
var rdd1: RDD[A],
var rdd2: RDD[B],
preservesPartitioning: Boolean = false)
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))

override def clearDependencies() {
rdd1 = null
rdd2 = null
f = null



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
List<Integer> data1 = Arrays.asList(3, 2, 12, 5, 6, 1);
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1,3);
JavaRDD<String> zipPartitionsRDD = javaRDD.zipPartitions(javaRDD1, new FlatMapFunction2<Iterator<Integer>, Iterator<Integer>, String>() {
public Iterable<String> call(Iterator<Integer> integerIterator, Iterator<Integer> integerIterator2) throws Exception {
LinkedList<String> linkedList = new LinkedList<String>();
while(integerIterator.hasNext() && integerIterator2.hasNext())
linkedList.add(integerIterator.next().toString() + "_" + integerIterator2.next().toString());
return linkedList;
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipPartitionsRDD.collect());



Zips this RDD with another one, returning key-value pairs with the first element in each RDD,second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).


def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U]



def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +          "same number of elements in each partition")
def next(): (T, U) = (thisIter.next(), otherIter.next())



List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2);
JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3);
List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7);
JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1);
JavaPairRDD<Integer,Integer> zipRDD = javaRDD.zip(javaRDD1);
System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipRDD.collect());
