您的位置:首页 > 移动开发

Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues

2018-02-12 16:36 429 查看

Spark API 详解/大白话解释 之 map、mapPartitions、mapValues、mapWith、flatMap、flatMapWith、flatMapValues

标签: spark2016-01-21 14:44 24457人阅读 评论(0) 收藏 举报

 分类:Spark(34) 

版权声明:本文为博主原创文章,未经博主允许不得转载。 http://blog.csdn.net/guotong1988/article/details/50555185map(function) 
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。举例:
val a = sc.parallelize(1 to 9, 3)
val b = a.map(x => x*2)//x => x*2是一个函数,x是传入参数即RDD的每个元素,x*2是返回值
a.collect
//结果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
b.collect
//结果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
1
2
3
4
5
6
当然map也可以把Key变成Key-Value对
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x, 1))
b.collect.foreach(println(_))
/*
(dog,1)
(tiger,1)
(lion,1)
(cat,1)
(panther,1)
( eagle,1)
*/
1
2
3
4
5
6
7
8
9
10
11
12
mapPartitions(function) 
map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区
package test

import scala.Iterator

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TestRdd {
def sumOfEveryPartition(input: Iterator[Int]): Int = {
var total = 0
input.foreach { elem =>
total += elem
}
total
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Rdd Test")
val spark = new SparkContext(conf)
val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6个元素,分成2个partition
val result = input.mapPartitions(
partition => Iterator(sumOfEveryPartition(partition)))//partition是传入的参数,是个list,要求返回也是list,即Iterator(sumOfEveryPartition(partition))
result.collect().foreach {
println(_)//6 15
}
spark.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
mapValues(function) 
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
1
2
3
//
"x" + _ + "x"
等同于
everyInput =>"x" + everyInput + "x"
 
//结果 
Array( 
(3,xdogx), 
(5,xtigerx), 
(4,xlionx), 
(3,xcatx), 
(7,xpantherx), 
(5,xeaglex) 
)mapWith和flatMapWith 
感觉用得不多,参考http://blog.csdn.net/jewes/article/details/39896301flatMap(function) 
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素
c267
经flatmap处理后可生成多个元素
val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)//每个元素扩展
b.collect
/*
结果    Array[Int] = Array( 1,
1, 2,
1, 2, 3,
1, 2, 3, 4)
*/
1
2
3
4
5
6
7
8
9
flatMapValues(function)
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=>1 to x)
b.collect.foreach(println(_))
/*
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
*/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: