【Spark Java API】Transformation(13)—zipWithIndex、zipWithUniqueId
2016-08-20 11:37
393 查看
zipWithIndex
官方文档描述:
Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index. This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.This method needs to trigger a spark job when this RDD contains more than one partitions.
函数原型:
def zipWithIndex(): JavaPairRDD[T, JLong]
该函数将RDD中的元素和这个元素在RDD中的indices组合起来,形成键/值对的RDD。
源码分析:
def zipWithIndex(): RDD[(T, Long)] = withScope { new ZippedWithIndexRDD(this) } /** The start index of each partition. */ @transient private val startIndices: Array[Long] = { val n = prev.partitions.length if (n == 0) { Array[Long]() } else if (n == 1) { Array(0L) } else { prev.context.runJob( prev, Utils.getIteratorSize _, 0 until n - 1, // do not need to count the last partition allowLocal = false ).scanLeft(0L)(_ + _) } } override def compute(splitIn: Partition, context: TaskContext): Iterator[(T, Long)] = { val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition] firstParent[T].iterator(split.prev, context).zipWithIndex.map { x => (x._1, split.startIndex + x._2) } }
从源码中可以看出,该函数返回ZippedWithIndexRDD,在ZippedWithIndexRDD中通过计算startIndices获得index;然后在compute函数中利用scala的zipWithIndex计算index。
实例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithIndex(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());
zipWithUniqueId
官方文档描述:
Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
函数原型:
def zipWithUniqueId(): JavaPairRDD[T, JLong]
该函数将RDD中的元素和一个对应的唯一ID组合成键值对,其中ID的生成算法是每个分区的第一元素的ID是该分区索引号,每个分区中的第N个元素的ID是(N * 该RDD总的分区数) + (该分区索引号)。
源码分析:
def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) } } }
*从源码中可以看出,zipWithUniqueId()函数是利用mapPartitionsWithIndex()函数获得每个元素的分区索引号,同时利用(i*n + k)进行相应的计算。
实例:
List<Integer> data = Arrays.asList(5, 1, 1, 4, 4, 2, 2); JavaRDD<Integer> javaRDD = javaSparkContext.parallelize(data,3); List<Integer> data1 = Arrays.asList(3,2,12,5,6,1,7); JavaRDD<Integer> javaRDD1 = javaSparkContext.parallelize(data1); JavaPairRDD<Integer,Long> zipWithIndexRDD = javaRDD.zipWithUniqueId(); System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + zipWithIndexRDD.collect());
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- 搭建hadoop/spark集群环境
- Spark HA部署方案
- Spark HA原理架构图
- spark内存概述