您的位置:首页 > 其它

Spark计算平台算子介绍与学习

2017-06-29 09:51 302 查看

1. 算子分类

Spark算子大体上分为两大类Transformation:操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发;Action:会触发 Spark 提交作业(Job),并将数据输出 Spark系统。当然,对于transformation算子,如果要较真的话,还可以继续分一分Value数据类型的Transformation算子;Key-Value数据类型的Transfromation算子。

1.1 Value数据类型的Transformation算子

类型算子
输入分区与输出分区一对一型map、flatMap、mapPartitions、glom
输入分区与输出分区多对一型union、cartesian
输入分区与输出分区多对多型groupBy
输出分区为输入分区子集型filter、distinct、subtract、sample、takeSample
Cache型cache、persist

1.2 Key-Value数据类型的Transfromation算子

类型算子
输入分区与输出分区一对一mapValues
对单个RDDcombineByKey、reduceByKey、partitionBy
两个RDD聚集Cogroup
连接join、leftOutJoin、rightOutJoin

1.3 Action算子

类型算子
无输出foreach
HDFSsaveAsTextFile、saveAsObjectFile
Scala集合和数据类型collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate
2、Transformation算子详解2.1 map说明:将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素

Scala示例:

def map() {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)

val datas: Array[String] = Array(
"{'id':1,'name':'xl1','pwd':'xl123','sex':2}",
"{'id':2,'name':'xl2','pwd':'xl123','sex':1}",
"{'id':3,'name':'xl3','pwd':'xl123','sex':2}")

sc.parallelize(datas)
.map(v => {
new Gson().fromJson(v, classOf[User])
})
.foreach(user => {
println("id: " + user.id
+ " name: " + user.name
+ " pwd: " + user.pwd
+ " sex:" + user.sex)
})
}
2.2 filter说明:对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉

Scala示例:

def filter() {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)

val datas = Array(1, 2, 3, 7, 4, 5, 8)

sc.parallelize(datas)
.filter(v => v >= 3)
.foreach(println)
}
2.3 flatMap说明:与map类似,但每个输入的RDD成员可以产生0或多个输出成员

Scala示例:

sc.parallelize(datas)
.flatMap(line => line.split(","))
.foreach(println)

2.4 mapPartitions

说明:与map类似,但map中的func作用的是RDD中的每个元素,而mapPartitions中的func作用的对象是RDD的一整个分区,所以func的类型是Iterator

Scala示例:

sc.parallelize(datas, 3)
.mapPartitions(
n => {
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append(n.next())
}
result.iterator
}
)
.foreach(println)

2.5 mapPartitionsWithIndex

说明:与mapPartitions类似,但输入会多提供一个整数表示分区的编号,所以func的类型是(Int, Iterator)

Scala示例:

sc.parallelize(datas, 3)
.mapPartitionsWithIndex(
(m, n) => {
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append("分区索引:" + m + "\t" + n.next())
}
result.iterator
}
)
.foreach(println)

2.6 sample

说明:对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样比例;seed为随机数种子,比如当前时间戳

Scala示例:

sc.parallelize(datas)
.sample(withReplacement = false, 0.5, System.currentTimeMillis)
.foreach(println)

2.7 union

说明:合并两个RDD,不去重,要求两个RDD中的元素类型一致

Scala示例:

// sc.parallelize(datas1)
//     .union(sc.parallelize(datas2))
//     .foreach(println)

// 或

(sc.parallelize(datas1) ++ sc.parallelize(datas2))
.foreach(println)

2.8 intersection

说明:返回两个RDD的交集

Scala示例:

sc.parallelize(datas1)
.intersection(sc.parallelize(datas2))
.foreach(println)

2.9 distinct

说明:对原RDD进行去重操作,返回RDD中没有重复的成员

Scala示例:

sc.parallelize(datas)
.distinct()
.foreach(println)

2.10 groupByKey

说明:对
<key, value>
结构的RDD进行类似RMDB的group by聚合操作,具有相同key的RDD成员的value会被聚合在一起,返回的RDD的结构是
(key,
Iterator<value>)

Scala示例:

def groupBy(sc: SparkContext): Unit = {
sc.parallelize(1 to 9, 3)
.groupBy(x => {
if (x % 2 == 0) "偶数"
else "奇数"
})
.collect()
.foreach(println)

val datas2 = Array("dog", "tiger", "lion", "cat", "spider", "eagle")
sc.parallelize(datas2)
.keyBy(_.length)
.groupByKey()
.collect()
.foreach(println)
}
// 结果
(奇数,[1, 3, 5, 7, 9])(偶数,[2, 4, 6, 8])
(4,[lion])(6,[spider])(3,[dog, cat])(5,[tiger, eagle])

2.11 reduceByKey

说明:对
<key, value>
结构的RDD进行聚合,对具有相同key的value调用func来进行reduce操作,func的类型必须是
(V,
V) => V

Scala示例:

val textFile = sc.textFile("file:///home/zkpk/spark-2.0.1/README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
println("wordCounts: ")
wordCounts.collect().foreach(println)
// 结果
package 1
For 3
Programs    1
(略)

2.12 aggregateByKey

说明:aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值得类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接返回非RDD的结果。参数:zeroValue:表示在每个分区中第一次拿到key值时,用于创建一个返回类型的函数,这个函数最终会被包装成先生成一个返回类型,然后通过调用seqOp函数,把第一个key对应的value添加到这个类型U的变量中。seqOp:这个用于把迭代分区中key对应的值添加到zeroValue创建的U类型实例中。combOp:这个用于合并每个分区中聚合过来的两个U类型的值。

Scala示例:

def aggregateByKey(sc: SparkContext): Unit = {// 合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型def seq(a:Int, b:Int): Int = {println("seq: " + a + "\t" + b)math.max(a, b)}// 合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型def comb(a:Int, b:Int): Int = {println("comb: " + a + "\t" + b)a + b}// 数据拆分成两个分区// 分区一数据: (1,3) (1,2)// 分区二数据: (1,4) (2,3)// zeroValue 中立值,定义返回value的类型,并参与运算// seqOp 用来在一个partition中合并值的// 分区一相同key的数据进行合并// seq: 0   3  (1,3)开始和中位值合并为3// seq: 3   2  (1,2)再次合并为3// 分区二相同key的数据进行合并// seq: 0   4  (1,4)开始和中位值合并为4// seq: 0   3  (2,3)开始和中位值合并为3// comb 用来在不同partition中合并值的// 将两个分区的结果进行合并// key为1的, 两个分区都有, 合并为(1,7)// key为2的, 只有一个分区有, 不需要合并(2,3)sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)), 2).aggregateByKey(0)(seq, comb).collect().foreach(println)}// 结果(2,3)(1,7)

2.13 sortByKey

说明:对
<key, value>
结构的RDD进行升序或降序排列参数:comp:排序时的比较运算方式。ascending:false降序;true升序。

Scala示例:

def sortByKey(sc: SparkContext) : Unit = {sc.parallelize(Array(60, 70, 80, 55, 45, 75)).sortBy(v => v, false).foreach(println)sc.parallelize(List((3, 3), (2, 2), (1, 4), (2, 3))).sortByKey(true).foreach(println)}
// 结果
80
75
70
60
55
45
(3,3)
(2,2)
(2,3)
(1,4)

2.14 join

说明:对
<K, V>
<K, W>
进行join操作,返回
(K,(V, W))
外连接函数为leftOuterJoin、rightOuterJoin和fullOuterJoin

Scala示例:

sc.parallelize(List((1, "苹果"), (2, "梨"), (3, "香蕉"), (4, "石榴"))).join(sc.parallelize(List((1, 7), (2, 3), (3, 8), (4, 3), (5, 9)))).foreach(println)
// 结果(4,(石榴,3))(1,(苹果,7))(3,(香蕉,8))(2,(梨,3))

2.15 cogroup

说明:cogroup:对多个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合,与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。

Scala示例:

def cogroup(sc: SparkContext): Unit = {val datas1 = List((1, "苹果"),(2, "梨"),(3, "香蕉"),(4, "石榴"))val datas2 = List((1, 7),(2, 3),(3, 8),(4, 3))val datas3 = List((1, "7"),(2, "3"),(3, "8"),(4, "3"),(4, "4"),(4, "5"),(4, "6"))sc.parallelize(datas1).cogroup(sc.parallelize(datas2),sc.parallelize(datas3)).foreach(println)}// 结果(4,(CompactBuffer(石榴),CompactBuffer(3),CompactBuffer(3, 4, 5, 6)))(1,(CompactBuffer(苹果),CompactBuffer(7),CompactBuffer(7)))(3,(CompactBuffer(香蕉),CompactBuffer(8),CompactBuffer(8)))(2,(CompactBuffer(梨),CompactBuffer(3),CompactBuffer(3)))

2.16 cartesian

说明:两个RDD进行笛卡尔积合并

Scala示例:

namesRDD.cartesian(scoreRDD).foreach(println)
// 结果
张三  60
张三  70
张三  80
李四  60
李四  70
李四  80
王五  60
王五  70
王五  80

2.17 pipe

说明:执行cmd命令,创建RDD

Scala示例:

echo.sh内容
#!/bin/bashecho "Running shell script"RESULT=""while read LINE; doRESULT=${RESULT}" "${LINE}doneecho ${RESULT} > /Users/zhangws/out123.txt
1234567812345678测试代码
def pipe(sc: SparkContext): Unit = {val data = List("hi", "hello", "how", "are", "you")sc.makeRDD(data).pipe("/Users/zhangws/echo.sh").collect().foreach(println)}
12345671234567结果
# out123.txthi hello how are you# 输出Running shell script

2.18 coalesce

说明:用于将RDD进行重分区,使用HashPartitioner,且该RDD的分区个数等于numPartitions个数,如果shuffle设置为true,则会进行shuffle。

Scala示例:

def coalesce(sc: SparkContext): Unit = {val datas = List("hi", "hello", "how", "are", "you")val datasRDD = sc.parallelize(datas, 4)println("RDD的分区数: " + datasRDD.partitions.length)val datasRDD2 = datasRDD.coalesce(2)println("RDD的分区数: " + datasRDD2.partitions.length)}
// 结果
RDD的分区数: 4
RDD的分区数: 2

2.19 repartition

说明:该函数其实就是coalesce函数第二个参数为true的实现

2.20 repartitionAndSortWithinPartitions

说明:根据给定的Partitioner重新分区,并且每个分区内根据comp实现排序。

Scala示例

def repartitionAndSortWithinPartitions(sc: SparkContext): Unit = {def partitionFunc(key:String): Int = {key.substring(7).toInt}val datas = new Array[String](1000)val random = new Random(1)for (i <- 0 until 10; j <- 0 until 100) {val index: Int = i * 100 + jdatas(index) = "product" + random.nextInt(10) + ",url" + random.nextInt(100)}val datasRDD = sc.parallelize(datas)val pairRDD = datasRDD.map(line => (line, 1)).reduceByKey((a, b) => a + b)//        .foreach(println)pairRDD.repartitionAndSortWithinPartitions(new Partitioner() {override def numPartitions: Int = 10override def getPartition(key: Any): Int = {val str = String.valueOf(key)str.substring(7, str.indexOf(',')).toInt}}).foreach(println)}
// 结果
(product00,url099)
(product00,url006)
(product00,url088)
(product09,url004)
(product09,url021)
(product09,url036)

3. Action

3.1 reduce

说明:对RDD成员使用func进行reduce操作,func接受两个参数,合并之后只返回一个值。reduce操作的返回结果只有一个值。需要注意的是,func会并发执行

Scala示例:

def reduce(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).reduce((x, y) => x + y))}// 结果55

3.2 collect

说明:将RDD读取至Driver程序,类型是Array,一般要求RDD不要太大。

3.3 count

说明:返回RDD的成员数量

Scala示例:

def count(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).count)}// 结果10

3.4 first

说明:返回RDD的第一个成员,等价于take(1)

Scala示例:

def first(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).first())}// 结果1

3.5 take

说明:返回RDD前n个成员

Scala示例:

def take(sc: SparkContext): Unit = {sc.parallelize(1 to 10).take(2).foreach(println)}// 结果12

3.6 takeSample

说明:和sample用法相同,只不第二个参数换成了个数。返回也不是RDD,而是collect。

Scala示例:

def takeSample(sc: SparkContext): Unit = {sc.parallelize(1 to 10).takeSample(withReplacement = false, 3, 1).foreach(println)}// 结果1810

3.7 takeOrdered

说明:用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。

Scala示例:

def takeOrdered(sc: SparkContext): Unit = {sc.parallelize(Array(5,6,2,1,7,8)).takeOrdered(3)(new Ordering[Int](){override def compare(x: Int, y: Int): Int = y.compareTo(x)}).foreach(println)}// 结果876

3.8 saveAsTextFile

说明:将RDD转换为文本内容并保存至路径path下,可能有多个文件(和partition数有关)。路径path可以是本地路径或HDFS地址,转换方法是对RDD成员调用toString函数

Scala示例:

def saveAsTextFile(sc: SparkContext): Unit = {sc.parallelize(Array(5,6,2,1,7,8)).saveAsTextFile("/Users/zhangws/Documents/test")}// 结果/Users/zhangws/Documents/test目录下_SUCCESSpart-00000// part-00000文件内容562178

3.9 saveAsSequenceFile

说明:与saveAsTextFile类似,但以SequenceFile格式保存,成员类型必须实现Writeable接口或可以被隐式转换为Writable类型(比如基本Scala类型Int、String等)

3.10 saveAsObjectFile

说明:用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。

3.11 countByKey

说明:仅适用于(K, V)类型,对key计数,返回(K, Int)

Scala示例:

def reduce(sc: SparkContext): Unit = {println(sc.parallelize(Array(("A", 1), ("B", 6), ("A", 2), ("C", 1), ("A", 7), ("A", 8))).countByKey())}// 结果Map(B -> 1, A -> 4, C -> 1)

3.12 foreach

说明:对RDD中的每个成员执行func,没有返回值,常用于更新计数器或输出数据至外部存储系统。这里需要注意变量的作用域
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: