1. 算子分类
Spark算子大体上分为两大类Transformation:操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发;Action:会触发 Spark 提交作业(Job),并将数据输出 Spark系统。当然,对于transformation算子,如果要较真的话,还可以继续分一分Value数据类型的Transformation算子;Key-Value数据类型的Transfromation算子。
1.1 Value数据类型的Transformation算子
| 类型 | 算子 |
---|
输入分区与输出分区一对一型 | map、flatMap、mapPartitions、glom |
输入分区与输出分区多对一型 | union、cartesian |
输入分区与输出分区多对多型 | groupBy |
输出分区为输入分区子集型 | filter、distinct、subtract、sample、takeSample |
Cache型 | cache、persist |
1.2 Key-Value数据类型的Transfromation算子
| 类型 | 算子 |
---|
输入分区与输出分区一对一 | mapValues |
对单个RDD | combineByKey、reduceByKey、partitionBy |
两个RDD聚集 | Cogroup |
连接 | join、leftOutJoin、rightOutJoin |
1.3 Action算子
| 类型 | 算子 |
---|
无输出 | foreach |
HDFS | saveAsTextFile、saveAsObjectFile |
Scala集合和数据类型 | collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate |
2、Transformation算子详解2.1 map说明:将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素
Scala示例:
def map() {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val datas: Array[String] = Array(
"{'id':1,'name':'xl1','pwd':'xl123','sex':2}",
"{'id':2,'name':'xl2','pwd':'xl123','sex':1}",
"{'id':3,'name':'xl3','pwd':'xl123','sex':2}")
sc.parallelize(datas)
.map(v => {
new Gson().fromJson(v, classOf[User])
})
.foreach(user => {
println("id: " + user.id
+ " name: " + user.name
+ " pwd: " + user.pwd
+ " sex:" + user.sex)
})
}
2.2 filter说明:对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉
Scala示例:
def filter() {
val conf = new SparkConf().setAppName(ScalaOperatorDemo.getClass.getSimpleName).setMaster("local")
val sc = new SparkContext(conf)
val datas = Array(1, 2, 3, 7, 4, 5, 8)
sc.parallelize(datas)
.filter(v => v >= 3)
.foreach(println)
}
2.3 flatMap说明:与map类似,但每个输入的RDD成员可以产生0或多个输出成员
Scala示例:
sc.parallelize(datas)
.flatMap(line => line.split(","))
.foreach(println)
2.4 mapPartitions
说明:与map类似,但map中的func作用的是RDD中的每个元素,而mapPartitions中的func作用的对象是RDD的一整个分区,所以func的类型是Iterator
Scala示例:
sc.parallelize(datas, 3)
.mapPartitions(
n => {
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append(n.next())
}
result.iterator
}
)
.foreach(println)
2.5 mapPartitionsWithIndex
说明:与mapPartitions类似,但输入会多提供一个整数表示分区的编号,所以func的类型是(Int, Iterator)
Scala示例:
sc.parallelize(datas, 3)
.mapPartitionsWithIndex(
(m, n) => {
val result = ArrayBuffer[String]()
while (n.hasNext) {
result.append("分区索引:" + m + "\t" + n.next())
}
result.iterator
}
)
.foreach(println)
2.6 sample
说明:对RDD进行抽样,其中参数withReplacement为true时表示抽样之后还放回,可以被多次抽样,false表示不放回;fraction表示抽样比例;seed为随机数种子,比如当前时间戳
Scala示例:
sc.parallelize(datas)
.sample(withReplacement = false, 0.5, System.currentTimeMillis)
.foreach(println)
2.7 union
说明:合并两个RDD,不去重,要求两个RDD中的元素类型一致
Scala示例:
// sc.parallelize(datas1)
// .union(sc.parallelize(datas2))
// .foreach(println)
// 或
(sc.parallelize(datas1) ++ sc.parallelize(datas2))
.foreach(println)
2.8 intersection
说明:返回两个RDD的交集
Scala示例:
sc.parallelize(datas1)
.intersection(sc.parallelize(datas2))
.foreach(println)
2.9 distinct
说明:对原RDD进行去重操作,返回RDD中没有重复的成员
Scala示例:
sc.parallelize(datas)
.distinct()
.foreach(println)
2.10 groupByKey
说明:对
<key, value>
结构的RDD进行类似RMDB的group by聚合操作,具有相同key的RDD成员的value会被聚合在一起,返回的RDD的结构是
(key,
Iterator<value>)
Scala示例:
def groupBy(sc: SparkContext): Unit = {
sc.parallelize(1 to 9, 3)
.groupBy(x => {
if (x % 2 == 0) "偶数"
else "奇数"
})
.collect()
.foreach(println)
val datas2 = Array("dog", "tiger", "lion", "cat", "spider", "eagle")
sc.parallelize(datas2)
.keyBy(_.length)
.groupByKey()
.collect()
.foreach(println)
}
// 结果
(奇数,[1, 3, 5, 7, 9])(偶数,[2, 4, 6, 8])
(4,[lion])(6,[spider])(3,[dog, cat])(5,[tiger, eagle])
2.11 reduceByKey
说明:对
<key, value>
结构的RDD进行聚合,对具有相同key的value调用func来进行reduce操作,func的类型必须是
(V,
V) => V
Scala示例:
val textFile = sc.textFile("file:///home/zkpk/spark-2.0.1/README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
println("wordCounts: ")
wordCounts.collect().foreach(println)
// 结果
package 1
For 3
Programs 1
(略)
2.12 aggregateByKey
说明:aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值得类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接返回非RDD的结果。参数:zeroValue:表示在每个分区中第一次拿到key值时,用于创建一个返回类型的函数,这个函数最终会被包装成先生成一个返回类型,然后通过调用seqOp函数,把第一个key对应的value添加到这个类型U的变量中。seqOp:这个用于把迭代分区中key对应的值添加到zeroValue创建的U类型实例中。combOp:这个用于合并每个分区中聚合过来的两个U类型的值。
Scala示例:
def aggregateByKey(sc: SparkContext): Unit = {// 合并在同一个partition中的值,a的数据类型为zeroValue的数据类型,b的数据类型为原value的数据类型def seq(a:Int, b:Int): Int = {println("seq: " + a + "\t" + b)math.max(a, b)}// 合并在不同partition中的值,a,b的数据类型为zeroValue的数据类型def comb(a:Int, b:Int): Int = {println("comb: " + a + "\t" + b)a + b}// 数据拆分成两个分区// 分区一数据: (1,3) (1,2)// 分区二数据: (1,4) (2,3)// zeroValue 中立值,定义返回value的类型,并参与运算// seqOp 用来在一个partition中合并值的// 分区一相同key的数据进行合并// seq: 0 3 (1,3)开始和中位值合并为3// seq: 3 2 (1,2)再次合并为3// 分区二相同key的数据进行合并// seq: 0 4 (1,4)开始和中位值合并为4// seq: 0 3 (2,3)开始和中位值合并为3// comb 用来在不同partition中合并值的// 将两个分区的结果进行合并// key为1的, 两个分区都有, 合并为(1,7)// key为2的, 只有一个分区有, 不需要合并(2,3)sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3)), 2).aggregateByKey(0)(seq, comb).collect().foreach(println)}// 结果(2,3)(1,7)
2.13 sortByKey
说明:对
<key, value>
结构的RDD进行升序或降序排列参数:comp:排序时的比较运算方式。ascending:false降序;true升序。
Scala示例:
def sortByKey(sc: SparkContext) : Unit = {sc.parallelize(Array(60, 70, 80, 55, 45, 75)).sortBy(v => v, false).foreach(println)sc.parallelize(List((3, 3), (2, 2), (1, 4), (2, 3))).sortByKey(true).foreach(println)}
// 结果
80
75
70
60
55
45
(3,3)
(2,2)
(2,3)
(1,4)
2.14 join
说明:对
<K, V>
和
<K, W>
进行join操作,返回
(K,(V, W))
外连接函数为leftOuterJoin、rightOuterJoin和fullOuterJoin
Scala示例:
sc.parallelize(List((1, "苹果"), (2, "梨"), (3, "香蕉"), (4, "石榴"))).join(sc.parallelize(List((1, 7), (2, 3), (3, 8), (4, 3), (5, 9)))).foreach(println)
// 结果(4,(石榴,3))(1,(苹果,7))(3,(香蕉,8))(2,(梨,3))
2.15 cogroup
说明:cogroup:对多个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合,与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
Scala示例:
def cogroup(sc: SparkContext): Unit = {val datas1 = List((1, "苹果"),(2, "梨"),(3, "香蕉"),(4, "石榴"))val datas2 = List((1, 7),(2, 3),(3, 8),(4, 3))val datas3 = List((1, "7"),(2, "3"),(3, "8"),(4, "3"),(4, "4"),(4, "5"),(4, "6"))sc.parallelize(datas1).cogroup(sc.parallelize(datas2),sc.parallelize(datas3)).foreach(println)}// 结果(4,(CompactBuffer(石榴),CompactBuffer(3),CompactBuffer(3, 4, 5, 6)))(1,(CompactBuffer(苹果),CompactBuffer(7),CompactBuffer(7)))(3,(CompactBuffer(香蕉),CompactBuffer(8),CompactBuffer(8)))(2,(CompactBuffer(梨),CompactBuffer(3),CompactBuffer(3)))
2.16 cartesian
说明:两个RDD进行笛卡尔积合并
Scala示例:
namesRDD.cartesian(scoreRDD).foreach(println)
// 结果
张三 60
张三 70
张三 80
李四 60
李四 70
李四 80
王五 60
王五 70
王五 80
2.17 pipe
说明:执行cmd命令,创建RDD
Scala示例:
echo.sh内容
#!/bin/bashecho "Running shell script"RESULT=""while read LINE; doRESULT=${RESULT}" "${LINE}doneecho ${RESULT} > /Users/zhangws/out123.txt
12345678
12345678测试代码
def pipe(sc: SparkContext): Unit = {val data = List("hi", "hello", "how", "are", "you")sc.makeRDD(data).pipe("/Users/zhangws/echo.sh").collect().foreach(println)}
12345671234567结果
# out123.txthi hello how are you# 输出Running shell script
2.18 coalesce
说明:用于将RDD进行重分区,使用HashPartitioner,且该RDD的分区个数等于numPartitions个数,如果shuffle设置为true,则会进行shuffle。
Scala示例:
def coalesce(sc: SparkContext): Unit = {val datas = List("hi", "hello", "how", "are", "you")val datasRDD = sc.parallelize(datas, 4)println("RDD的分区数: " + datasRDD.partitions.length)val datasRDD2 = datasRDD.coalesce(2)println("RDD的分区数: " + datasRDD2.partitions.length)}
// 结果
RDD的分区数: 4
RDD的分区数: 2
2.19 repartition
说明:该函数其实就是coalesce函数第二个参数为true的实现
2.20 repartitionAndSortWithinPartitions
说明:根据给定的Partitioner重新分区,并且每个分区内根据comp实现排序。
Scala示例
def repartitionAndSortWithinPartitions(sc: SparkContext): Unit = {def partitionFunc(key:String): Int = {key.substring(7).toInt}val datas = new Array[String](1000)val random = new Random(1)for (i <- 0 until 10; j <- 0 until 100) {val index: Int = i * 100 + jdatas(index) = "product" + random.nextInt(10) + ",url" + random.nextInt(100)}val datasRDD = sc.parallelize(datas)val pairRDD = datasRDD.map(line => (line, 1)).reduceByKey((a, b) => a + b)// .foreach(println)pairRDD.repartitionAndSortWithinPartitions(new Partitioner() {override def numPartitions: Int = 10override def getPartition(key: Any): Int = {val str = String.valueOf(key)str.substring(7, str.indexOf(',')).toInt}}).foreach(println)}
// 结果
(product00,url099)
(product00,url006)
(product00,url088)
略
(product09,url004)
(product09,url021)
(product09,url036)
3. Action
3.1 reduce
说明:对RDD成员使用func进行reduce操作,func接受两个参数,合并之后只返回一个值。reduce操作的返回结果只有一个值。需要注意的是,func会并发执行
Scala示例:
def reduce(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).reduce((x, y) => x + y))}// 结果55
3.2 collect
说明:将RDD读取至Driver程序,类型是Array,一般要求RDD不要太大。
3.3 count
说明:返回RDD的成员数量
Scala示例:
def count(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).count)}// 结果10
3.4 first
说明:返回RDD的第一个成员,等价于take(1)
Scala示例:
def first(sc: SparkContext): Unit = {println(sc.parallelize(1 to 10).first())}// 结果1
3.5 take
说明:返回RDD前n个成员
Scala示例:
def take(sc: SparkContext): Unit = {sc.parallelize(1 to 10).take(2).foreach(println)}// 结果12
3.6 takeSample
说明:和sample用法相同,只不第二个参数换成了个数。返回也不是RDD,而是collect。
Scala示例:
def takeSample(sc: SparkContext): Unit = {sc.parallelize(1 to 10).takeSample(withReplacement = false, 3, 1).foreach(println)}// 结果1810
3.7 takeOrdered
说明:用于从RDD中,按照默认(升序)或指定排序规则,返回前num个元素。
Scala示例:
def takeOrdered(sc: SparkContext): Unit = {sc.parallelize(Array(5,6,2,1,7,8)).takeOrdered(3)(new Ordering[Int](){override def compare(x: Int, y: Int): Int = y.compareTo(x)}).foreach(println)}// 结果876
3.8 saveAsTextFile
说明:将RDD转换为文本内容并保存至路径path下,可能有多个文件(和partition数有关)。路径path可以是本地路径或HDFS地址,转换方法是对RDD成员调用toString函数
Scala示例:
def saveAsTextFile(sc: SparkContext): Unit = {sc.parallelize(Array(5,6,2,1,7,8)).saveAsTextFile("/Users/zhangws/Documents/test")}// 结果/Users/zhangws/Documents/test目录下_SUCCESSpart-00000// part-00000文件内容562178
3.9 saveAsSequenceFile
说明:与saveAsTextFile类似,但以SequenceFile格式保存,成员类型必须实现Writeable接口或可以被隐式转换为Writable类型(比如基本Scala类型Int、String等)
3.10 saveAsObjectFile
说明:用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。
3.11 countByKey
说明:仅适用于(K, V)类型,对key计数,返回(K, Int)
Scala示例:
def reduce(sc: SparkContext): Unit = {println(sc.parallelize(Array(("A", 1), ("B", 6), ("A", 2), ("C", 1), ("A", 7), ("A", 8))).countByKey())}// 结果Map(B -> 1, A -> 4, C -> 1)
3.12 foreach
说明:对RDD中的每个成员执行func,没有返回值,常用于更新计数器或输出数据至外部存储系统。这里需要注意变量的作用域