通过例子学习spark rdd--Transformations函数
2017-11-24 15:50
417 查看
通过例子学习spark rdd
Transformations函数
map
floatMap
flatMap 和 map
mapPartitions
filter
distinct
repartition
sample
union
sortBy
sortByKey
intersection
glom
zip
zipParititions
zipWithIndex
在讲解例子的时候测试的数据如下:
在RDD的每个item上使用transformation函数,结果返回一个新的RDD。
函数原型
使用例子
在spark-shell中执行以上程序,得到的结果如下:
查看一下得到的rdd的类型
可以看出该rdd的类型是MapPartitionsRDD,该RDD是通过在父RDD上通过map运算而得到的。
注意:collect()函数会把所有的数据汇聚到本地,然后打印出来,若数据量太大,最好避免使用该函数。
类似于map,与map相比flatMap有两个很大的区别:
(1) flatMap允许在map函数的基础上扩展成多个成员。
(2) flatMap会将一个长度为N的RDD转换成一个N个元素的集合,然后再把这N个元素合成到一个单个RDD的结果集。
函数原型
使用例子
通过例子来查看这个两个的区别把:
flatMap的例子
flatMap的输出如下:
map的输出
map的输出如下:
小结
可以看到flatMap把最后的结果都合并到一个RDD的集合中了,而map是在每个item上输出是什么就保留什么元素,不合合并到一个集合中。
这是一个特殊的map,它在每个分区中只调用一次。通过输入参数(Iterarator [T]),各个分区的全部内容都可以作为值的顺序流。
自定义函数必须返回另一个Iterator [U]。组合的结果迭代器会自动转换为新的RDD。
请思考,结果的元组中,为什么没有:(3,4)和(6,7)
函数原型
使用举例
程序说明
请注意最后的结果,为什么没有(3,4)和(6,7),我们需要进行以下探索和思考:
思考:由于mapPartitions是在每个分区中运行的,所以我们先看看每个分区的数据:
得到每个分区的数据后,在对每个分区的数据进行myfunc函数的处理。可以看出,3和4不在同一个分区,6和7也不在同一个分区。
所以,在每个分区的处理时,不可能组合在一起,这就是为什么不会有(3,4)和(6,7)的原因。
功能
在RDD对象对象上使用filter函数,并返回满足条件的新的RDD。
使用例子
例子2
注意:distinct也可以指定分区数,若没有指定,使用原来的分区数。
功能
对RDD的元素去重。
返回一个包含每个唯一值一次的新RDD。
使用例子
说明
返回一个有确定分区数:numPartitions的RDD。在该RDD中可能会增加或减少并行度的水平。
在实现内部,该函数会开启一个shuffle过程来重新分配数据,若你减少RDD的分区,可以通过coalesce函数来避免进行shuffle。
repartition(numPartitions)只是coalesce(numPartitions,shuffle = true)的缩写。
使用例子
功能
随机选择RDD项目的一部分数据,并将其返回到新的RDD中。
使用例子
功能
执行标准集合操作:A联合B。
若元素有重复,会保留重复的元素。
使用例子
该函数对输入RDD的数据进行排序并将其存储在新的RDD中。
第一个参数: 需要您指定一个将输入数据映射到要sortBy的键的函数。
第二个参数:(可选)指定是否要按升序或降序对数据进行排序。
返回通过给定key函数生成的key。
函数原型
使用例子
例子2
该函数对输入RDD的数据进行排序并将其存储在新的RDD中。
输出RDD是一个shuffled RDD,因为它的数据是被shuffled的reducer输出。
这个功能的实现其实很聪明。首先,它使用范围分区器对混洗RDD内的范围内的数据进行分区。然后使用标准的排序机制,使用mapPartition单独对这些范围进行排序。
函数原型
使用例子
返回两个RDD的交集,输出的元素将会去重。
注意:该函数在内部会进行shuffle的过程。
函数原型
例子
创建一个新的RDD,该RDD把会将各个分区的所有元素合并到同一个数组中,若有多个分区,就会得到一个有多个数组的集合。
函数原型
代码实现
该函数的代码实现很简单,就是创建了一个MapPartitionsRDD,把每个分区的元素分别放到同一个数组中。
使用例子
如何理解?可以通过例子来说明。
将两个分区中的第n个分区相互组合,从而连接两个RDD。 生成的RDD将由两部分元组组成,这些元组被解释为键-值(key-value)对。
注意:使用该函数时,两个RDD的分区和元素个数必须一样,否则将会报错。见例子2。
函数原型
使用例子
例子2
和zip的功能相似,但可以提供更多的控制。
函数原型
使用例子
使用元素索引来压缩RDD的元素。索引从0开始。如果RDD分布在多个分区上,则启动一个Spark作业来执行此操作。
函数原型
使用例子
Transformations函数
map
floatMap
flatMap 和 map
mapPartitions
filter
distinct
repartition
sample
union
sortBy
sortByKey
intersection
glom
zip
zipParititions
zipWithIndex
通过例子学习spark rdd
Transformations函数
所有的Transformations函数完成后会返回一个新的RDD。在讲解例子的时候测试的数据如下:
$ hadoop fs -cat /user/zxh/pdata/pdata 3350,province_name,上海,5.0 3349,province_name,四川,4.0 3348,province_name,湖南,11.0 3348,province_name,河北,11.0
map
功能在RDD的每个item上使用transformation函数,结果返回一个新的RDD。
函数原型
def map[U: ClassTag](f: T => U): RDD[U]
使用例子
// 构建一个rdd scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant")) // 通过map计算rdd每个成员的长度 scala> val b = a.map(_.length) // 打印rdd scala> b.collect().foreach(println)
在spark-shell中执行以上程序,得到的结果如下:
3 6 6 3 8
查看一下得到的rdd的类型
scala> b res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:26
可以看出该rdd的类型是MapPartitionsRDD,该RDD是通过在父RDD上通过map运算而得到的。
注意:collect()函数会把所有的数据汇聚到本地,然后打印出来,若数据量太大,最好避免使用该函数。
floatMap
功能类似于map,与map相比flatMap有两个很大的区别:
(1) flatMap允许在map函数的基础上扩展成多个成员。
(2) flatMap会将一个长度为N的RDD转换成一个N个元素的集合,然后再把这N个元素合成到一个单个RDD的结果集。
函数原型
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
使用例子
scala> val a = sc.parallelize(1 to 10) // 扩展a的元素,把a的每个元素扩展成flatMap中的元素 scala> val b = a.flatMap(1 to _) // 得到最后的c的结果,可以看到把a的每个元素都扩展成1 到 该元素 的多个值。 scala> val c = b.collect() c: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
flatMap 和 map
flatMap会把N个元素的集合合成一个单一的RDD结果集通过例子来查看这个两个的区别把:
flatMap的例子
val lines = sc.textFile("/user/zxh/pdata/pdata") val rs1 = lines.flatMap(line=>line.split(",")) rs1.collect()
flatMap的输出如下:
res5: Array[String] = Array(3350, province_name, 上海, 5.0, 3349, province_name, 四川, 4.0, 3348, province_name, 湖南, 11.0, 3348, province_name, 河北, 11.0)
map的输出
val lines = sc.textFile("/user/zxh/pdata/pdata") val rs2 = lines.map(line => line.split(",")) rs2.collect()
map的输出如下:
res6: Array[Array[String]] = Array(Array(3350, province_name, 上海, 5.0), Array(3349, province_name, 四川, 4.0), Array(3348, province_name, 湖南, 11.0), Array(3348, province_name, 河北, 11.0))
小结
可以看到flatMap把最后的结果都合并到一个RDD的集合中了,而map是在每个item上输出是什么就保留什么元素,不合合并到一个集合中。
mapPartitions
功能这是一个特殊的map,它在每个分区中只调用一次。通过输入参数(Iterarator [T]),各个分区的全部内容都可以作为值的顺序流。
自定义函数必须返回另一个Iterator [U]。组合的结果迭代器会自动转换为新的RDD。
请思考,结果的元组中,为什么没有:(3,4)和(6,7)
函数原型
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
使用举例
scala> val a = sc.parallelize(1 to 10, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:24 scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { | var res = List[(T, T)]() | var pre = iter.next | while (iter.hasNext) | { | val cur = iter.next; | res .::= (pre, cur) | pre = cur; | } | res.iterator | } myfunc: [T](iter: Iterator[T])Iterator[(T, T)] scala> a.mapPartitions(myfunc).collect res19: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (9,10), (8,9), (7,8))
程序说明
请注意最后的结果,为什么没有(3,4)和(6,7),我们需要进行以下探索和思考:
思考:由于mapPartitions是在每个分区中运行的,所以我们先看看每个分区的数据:
// 通过glom函数把每个分区的数据聚合起来,方便进行查看 scala> a.glom().collect() res27: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
得到每个分区的数据后,在对每个分区的数据进行myfunc函数的处理。可以看出,3和4不在同一个分区,6和7也不在同一个分区。
所以,在每个分区的处理时,不可能组合在一起,这就是为什么不会有(3,4)和(6,7)的原因。
filter
函数原型def filter(f: T => Boolean): RDD[T]
功能
在RDD对象对象上使用filter函数,并返回满足条件的新的RDD。
使用例子
scala> val a = sc.parallelize(1 to 10) scala> val b = a.filter(_>2) scala> b.collect() res3: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9, 10)
例子2
val lines = sc.textFile("/user/zxh/pdata/pdata") val r = lines.flatMap(line=>line.split(",")).filter(_.length>5) scala> r.collect() res9: Array[String] = Array(province_name, province_name, province_name, province_name) // 按元素长度进行过滤 scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3) r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[33] at filter at <console>:26 scala> r.collect() res11: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0) // 在每个元素中使用函数contains进行过滤 scala> val r = lines.flatMap(line=>line.split(",")).filter(_.contains("33")) r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[39] at filter at <console>:26 scala> r.collect() res14: Array[String] = Array(3350, 3349, 3348, 3348)
distinct
函数原型def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
注意:distinct也可以指定分区数,若没有指定,使用原来的分区数。
功能
对RDD的元素去重。
返回一个包含每个唯一值一次的新RDD。
使用例子
scala> val r = lines.flatMap(line=>line.split(",")).filter(_.length>3) r: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[59] at filter at <console>:26 // 这里输出的是有重复的元素 scala> r.collect() res20: Array[String] = Array(3350, province_name, 3349, province_name, 3348, province_name, 11.0, 3348, province_name, 11.0) // 调用distinct()后这输出的是去重后的元素 scala> r.distinct().collect() res21: Array[String] = Array(province_name, 3348, 11.0, 3350, 3349)
repartition
函数原型def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
说明
返回一个有确定分区数:numPartitions的RDD。在该RDD中可能会增加或减少并行度的水平。
在实现内部,该函数会开启一个shuffle过程来重新分配数据,若你减少RDD的分区,可以通过coalesce函数来避免进行shuffle。
repartition(numPartitions)只是coalesce(numPartitions,shuffle = true)的缩写。
使用例子
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat) val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 2 // 注意distinct也可以指定分区 a.distinct(3).partitions.length res17: Int = 3 val c2 = a.repartition(3) c2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[73] at repartition at <console>:26 c2.partitions.length res27: Int = 3
sample
函数原型def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
* withReplacement:是否要替换 * fraction:原来的RDD元素大小的百分比 * seed:随机数产生器的seed
功能
随机选择RDD项目的一部分数据,并将其返回到新的RDD中。
使用例子
val a = sc.parallelize(1 to 10000, 3) scala> a.sample(false,0.1,0).count() res4: Long = 1032 scala> a.sample(false,0.3,0).count() res5: Long = 2997 scala> a.sample(false,0.2,0).count() res6: Long = 2018
union,++
函数原型def union(other: RDD[T]): RDD[T]
功能
执行标准集合操作:A联合B。
若元素有重复,会保留重复的元素。
使用例子
scala> val a = sc.parallelize(1 to 5) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> val b = sc.parallelize(6 to 9) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24 scala> a.union(b).collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> (a++b).collect() res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) scala> (a ++ b).collect() res9: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
sortBy
功能说明该函数对输入RDD的数据进行排序并将其存储在新的RDD中。
第一个参数: 需要您指定一个将输入数据映射到要sortBy的键的函数。
第二个参数:(可选)指定是否要按升序或降序对数据进行排序。
返回通过给定key函数生成的key。
函数原型
def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
使用例子
scala> val a = sc.parallelize(Array(5, 7, 1, 3, 2, 1)) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 // 按值倒序排序 scala> a.sortBy(c=>c, false).collect() res1: Array[Int] = Array(7, 5, 3, 2, 1, 1) // 按值递增排序(默认值) scala> a.sortBy(c=>c, true).collect() res5: Array[Int] = Array(1, 1, 2, 3, 5, 7) // 按每个值除以4,得到的结果排序 scala> a.sortBy(c=>c/4, true).collect() res4: Array[Int] = Array(2, 1, 1, 3, 5, 7)
例子2
// 按RDD中的集合元素的第一个元素排序 scala> val x = sc.parallelize(Array(("abc", 10), ("def", 25), ("acb", 31), ("dfe", 59))) x: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:24 scala> x.sortBy(c => c._1, true).collect res9: Array[(String, Int)] = Array((abc,10), (acb,31), (def,25), (dfe,59))
sortByKey
功能说明该函数对输入RDD的数据进行排序并将其存储在新的RDD中。
输出RDD是一个shuffled RDD,因为它的数据是被shuffled的reducer输出。
这个功能的实现其实很聪明。首先,它使用范围分区器对混洗RDD内的范围内的数据进行分区。然后使用标准的排序机制,使用mapPartition单独对这些范围进行排序。
函数原型
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
使用例子
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5)) val a = sc.parallelize(1 to 100, 5) val b = a.cartesian(a) val c = sc.parallelize(b.takeSample(true, 5, 13), 2) val d = c.sortByKey(false) res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))
intersection
功能说明返回两个RDD的交集,输出的元素将会去重。
注意:该函数在内部会进行shuffle的过程。
函数原型
def intersection(other: RDD[T]): RDD[T] def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T], numPartitions: Int): RDD[T]
例子
scala> a.collect() res1: Array[Int] = Array(5, 7, 1, 3, 2, 1) scala> val b = sc.parallelize(Array(6,7,8,9,2,1)) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24 scala> val c = a.intersection(b) c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at intersection at <console>:28 scala> c.collect() res4: Array[Int] = Array(1, 2, 7)
glom
功能创建一个新的RDD,该RDD把会将各个分区的所有元素合并到同一个数组中,若有多个分区,就会得到一个有多个数组的集合。
函数原型
def glom(): RDD[Array[T]]
代码实现
该函数的代码实现很简单,就是创建了一个MapPartitionsRDD,把每个分区的元素分别放到同一个数组中。
/** * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) }
使用例子
如何理解?可以通过例子来说明。
scala> val a = sc.parallelize(1 to 10, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24 scala> a.collect() res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // 查看一下分区的个数 scala> a.partitions.length res2: Int = 3 // 调用了glom合并分区的数据 scala> val b = a.glom() b: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[3] at glom at <console>:26 // 每个分区的数据组成了一个array scala> b.collect() res4: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
zip
功能说明将两个分区中的第n个分区相互组合,从而连接两个RDD。 生成的RDD将由两部分元组组成,这些元组被解释为键-值(key-value)对。
注意:使用该函数时,两个RDD的分区和元素个数必须一样,否则将会报错。见例子2。
函数原型
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
使用例子
scala> val a = sc.parallelize(1 to 10, 3) scala> val b = sc.parallelize(11 to 20, 3) scala> val c = a.zip(b) c: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[9] at zip at <console>:28 scala> c.collect() res8: Array[(Int, Int)] = Array((1,11), (2,12), (3,13), (4,14), (5,15), (6,16), (7,17), (8,18), (9,19), (10,20))
例子2
scala> val a = sc.parallelize(1 to 10, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val b = sc.parallelize(1 to 12, 3) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24 scala> val c = a.zip(b) c: org.apache.spark.rdd.RDD[(Int, Int)] = ZippedPartitionsRDD2[12] at zip at <console>:28 scala> c.collect() 17/11/26 10:36:54 WARN TaskSetManager: Lost task 1.0 in stage 5.0 (TID 16, sz-pg-entuo-dev-024.tendcloud.com, executor 2): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition ...
zipParititions
功能和zip的功能相似,但可以提供更多的控制。
函数原型
def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
使用例子
val a = sc.parallelize(0 to 9, 3) val b = sc.parallelize(10 to 19, 3) val c = sc.parallelize(100 to 109, 3) def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] = { var res = List[String]() while (aiter.hasNext && biter.hasNext && citer.hasNext) { val x = aiter.next + " " + biter.next + " " + citer.next res ::= x } res.iterator } a.zipPartitions(b, c)(myfunc).collect res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106)
zipWithIndex
功能:使用元素索引来压缩RDD的元素。索引从0开始。如果RDD分布在多个分区上,则启动一个Spark作业来执行此操作。
函数原型
def zipWithIndex(): RDD[(T, Long)]
使用例子
// 字符串的例子 scala> val r1 = sc.parallelize(Array("a", "b", "c", "d")) r1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[13] at parallelize at <console>:24 scala> val r2 = r1.zipWithIndex r2: org.apache.spark.rdd.RDD[(String, Long)] = ZippedWithIndexRDD[14] at zipWithIndex at <console>:26 scala> r2.collect() res10: Array[(String, Long)] = Array((a,0), (b,1), (c,2), (d,3)) // 整数类型的例子 scala> val z = sc.parallelize(1 to 10, 5) z: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at <console>:24 scala> val r2 = z.zipWithIndex r2: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[18] at zipWithIndex at <console>:26 scala> r2.collect() res11: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4), (6,5), (7,6), (8,7), (9,8), (10,9)) scala> r2.partitions.length res12: Int = 5
相关文章推荐
- 通过例子学习spark rdd--Action函数
- 通过例子学习spark dataframe -- transformations函数(1)
- 通过例子学习spark dataframe -- transformations函数(2)
- 通过例子学习spark dataframe--基础函数和Action函数
- hadoop学习笔记3.通过电话通信清单例子简单使用Reduce和打包JAR
- 通过例子学习ABAP--ABAP编程语言 数据处理(一)
- Spark学习笔记(18)Spark Streaming中空RDD处理
- Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析
- 通过例子学习Lua
- spark学习:2.streaming例子
- 【转】通过例子学习正则表达式(二)--检查输入的"钱"
- 通过lua自带例子学习lua 02
- scala学习-Description Resource Path Location Type value toDF is not a member of org.apache.spark.rdd.R
- spark学习13(spark RDD)
- Spark学习笔记——RDD编程
- 通过例子学习lua-7
- 通过例子学习WPF开发
- linux设备驱动开发范例,linux驱动例子,Linux设备驱动详解范例example 宋宝华版的光盘驱动源码,实测编译通过入门学习用
- Spark学习之路 (三)Spark之RDD
- Spark学习-RDD编程基础