您的位置:首页 > 其它

Spark算子分类及功能描述

2017-05-22 13:38 190 查看

目录:

一、简介

二、Value型Transformation算子

列表列表列表
1)map2)flatMap3)mapPartiions
4)glom5)union6)cartesian
7)groupBy8)filter9)distinct
10)subtract11)sample12)takesample
13)cache persist

三、Key-Value型Transformation算子

列表列表列表
1)mapValues2)combineByKey3)reduceByKey
4)partitionBy5)cogroup6)join
7)leftOutJoin8)rightOutJoin

四、Actions算子

列表列表列表
1)foreach2)saveAsTextFile3)saveAsObjectFile
4)collect5)collectAsMap6)reduceByKeyLocally
7)lookup8)count9)top
10)reduce11)fold12)aggregate

————————————————————————————————————————–

一、简介

spark算子大致上可分三大类算子:

1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

3、Action算子,这类算子会触发SparkContext提交作业。

二、Value型Transformation算子

1)map

描述 : 数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD,新的RDD叫MappedRDD

备注:使用foreachPartition()可以显示分区数

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect


结果: res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

2)flatMap

描述:与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出

val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect


结果: res1: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect


结果: res2: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

3)mapPartiions

描述:类似与map,map作用于每个分区的每个元素,但mapPartitions作用于每个分区工 func的类型:Iterator[T] => Iterator[U] 假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,当在映射的过程中不断的创建对象时就可以使用mapPartitions比map的效率要高很多,比如当向数据库写入数据时,如果使用map就需要为每个元素创建connection对象,但使用mapPartitions的话就需要为每个分区创建connetcion对象

val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
val rdd = sc.parallelize(l,2)
rdd.mapPartitions(x => x.filter(_._2 == "female")).foreachPartition(p=>{
println(p.toList)
println("====分区分割线====" )
})


结果:

====分区分割线====

List((kpop,female))

====分区分割线====

List((lucy,female))

4)glom

描述:将RDD的每个分区中的类型为T的元素转换换数组Array[T]

val a = sc.parallelize(1 to 100, 3)
a.glom.collect


结果: res3: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100))

5)union

描述:将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重

val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
a.union(b).collect()


结果: res4: Array[Int] = Array(1, 2, 3, 1, 2, 3, 4, 5, 6, 7)

6)cartesian

描述:对两个RDD中的所有元素进行笛卡尔积操作

val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect


结果:res5: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

7)groupBy

描述:生成相应的key,相同的放在一起

val a = sc.parallelize(1 to 9, 3)
a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect


结果:res6: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9)))

8)filter

描述:对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉

val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect


结果:res7: Array[Int] = Array(2, 4, 6, 8, 10)

9)distinct

描述:去重

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect


结果:res8: Array[String] = Array(Dog, Gnu, Cat, Rat)

10)subtract

描述:去掉含有重复的项

val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect


结果:res9: Array[Int] = Array(6, 9, 4, 7, 5, 8)

11)sample

描述:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count


结果:res10: Long = 960

12)takesample

描述:takeSample()函数和sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行collect(),返回结果的集合为单机的数组。

val x = sc.parallelize(1 to 1000, 3)
x.takeSample(true, 100, 1)


结果: res11: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858)

13)cache、persist

描述:cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.getStorageLevel


结果:res12: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)

c.cache
c.getStorageLevel


结果:res13: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)

三、Key-Value型Transformation算子

1)mapValues

描述:mapValues是针对[K,V]中的V值进行map操作

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect


结果:res14: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

2)combineByKey

描述: 使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect


结果:res15: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

3)reduceByKey

描述: 对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect


结果:res16: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect


结果:res17: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

4)partitionBy

描述:对RDD进行分区操作

5)cogroup

描述:cogroup:对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
b.cogroup(c).collect


结果:

res18: Array[(Int, (Iterable[String], Iterable[String]))] = Array(

(2,(ArrayBuffer(b),ArrayBuffer(c))),

(3,(ArrayBuffer(b),ArrayBuffer(c))),

(1,(ArrayBuffer(b, b),ArrayBuffer(c, c)))

)

6)join

描述:对两个需要连接的RDD进行cogroup函数操作

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.join(d).collect


结果:res19: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

7)leftOutJoin

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.leftOuterJoin(d).collect


结果:res20: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

8)rightOutJoin

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.rightOuterJoin(d).collect


结果:res21: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

四、Actions算子

1)foreach

描述:打印输出

val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))


结果:**

lions are yummy

gnus are yummy

crocodiles are yummy

ants are yummy

whales are yummy

dolphins are yummy

spiders are yummy

2)saveAsTextFile

描述:保存结果到HDFS

val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("/user/yuhui/mydata_a")


结果:

[root@tagtic-slave03 ~]# hadoop fs -ls /user/yuhui/mydata_a

Found 4 items

-rw-r–r– 2 root supergroup 0 2017-05-22 14:28 /user/yuhui/mydata_a/_SUCCESS

-rw-r–r– 2 root supergroup 15558 2017-05-22 14:28 /user/yuhui/mydata_a/part-00000

-rw-r–r– 2 root supergroup 16665 2017-05-22 14:28 /user/yuhui/mydata_a/part-00001

-rw-r–r– 2 root supergroup 16671 2017-05-22 14:28 /user/yuhui/mydata_a/part-00002

3)saveAsObjectFile

描述:saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。

val x = sc.parallelize(1 to 100, 3)
x.saveAsObjectFile("/user/yuhui/objFile")
val y = sc.objectFile[Int]("/user/yuhui/objFile")
y.collect


结果:

res22: Array[Int] = Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

4)collect

描述:将RDD中的数据收集起来,变成一个Array,仅限数据量比较小的时候。

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect


结果:res23: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

5)collectAsMap

描述:返回hashMap包含所有RDD中的分片,key如果重复,后边的元素会覆盖前面的元素。

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collectAsMap


结果: res24: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

6)reduceByKeyLocally

描述:先执行reduce然后在执行collectAsMap

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect


结果:res25: Array[(Int, String)] = Array((3,dogcatowlgnuant))

7)lookup

描述:查找,针对key-value类型的RDD

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.lookup(5)


结果:res26: Seq[String] = WrappedArray(tiger, eagle)

8)count

描述:总数

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count


结果:res27: Long = 4

9)top

描述:返回最大的K个元素

val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)


结果:res28: Array[Int] = Array(9, 8)

10)reduce

描述:相当于对RDD中的元素进行reduceLeft函数的操作

val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)


结果:res29: Int = 5050

11)fold

描述:fold()与reduce()类似,接收与reduce接收的函数签名相同的函数,另外再加上一个初始值作为第一次调用的结果。

val a = sc.parallelize(List(1,2,3), 3)
a.fold(0)(_ + _)


结果:res30: Int = 6

12)aggregate

描述:aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果惊喜fold操作。

val z = sc.parallelize(List(1,2,3,4,5,6), 2)

// lets first print out the contents of the RDD with partition labels
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" +  index + ", val: " + x + "]").iterator
}

z.mapPartitionsWithIndex(myfunc).collect


结果:res31: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

z.aggregate(0)(math.max(_, _), _ + _)


结果:res32: Int = 9

如果您喜欢我写的博文,读后觉得收获很大,不妨小额赞助我一下,让我有动力继续写出高质量的博文,感谢您的赞赏!!!

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: