您的位置:首页 > 其它

“戏”说Spark-Spark核心-RDD转换操作算子详解(二)

2017-12-04 22:51 519 查看
“戏”说Spark-Spark核心-RDD转换操作算子详解(二)

算子概述

在“戏”说Spark-Spark核心-RDD转换操作算子详解(一)中,我们结合案例详细的讲解了一些比较常用的算子,现在我们继续了解Spark中提供的丰富的算子,我们讲解下面的这些算子

Distinct去重
foreachPartiton遍历
mapPartition格式转换(以partition为单位)
jion连接
union合并
结合案例讲解算子:

算子演示代码:

package spark.myspark.functions

import org.apache.spark.{SparkContext, SparkConf}

/**

* distinct:去重

* union:合并

* foreachPartition:遍历

* jion:内连接

* leftOuterJion:左外连接

* rightouterJion:右外连接

* fullouterJion:全外连接

*/

object otherTransformationAndAction_test {

def main(args: Array[String]) {

val conf= new SparkConf().setAppName("TransformationAndAction_test").setMaster("local")

val sc =new SparkContext(conf)

//=======================distinct========================================//

//去重distinct:源码就是:map+reduceByKey+map

//SparkContext.parallelize(Array,partitonNum=1):第二个参数默认是1,可以不用设置

val repeatRDD=sc.parallelize(Array("java","scala","js","java","html","js"))

val distinctRDD=repeatRDD.map((_,1)).reduceByKey(_+_).map(_._1)

distinctRDD.foreach(println)

repeatRDD.distinct().foreach(println)

//=======================foreachPartiton====================================//

//foreachPartiton和foreach()数据库连接对比案例代码

distinctRDD.foreach(x=>{

println("创建数据库连接")

println("拼接sql")

println("执行sql")

})

// foreachPartition(Iterator)实现:传入的参数是迭代器,将一个Partition的数据加载到内存中,然后再遍历

// foreach()遍历的基本单位是每一条记录,foreachPartition()遍历的单位是partition,类似的////还有map()和mapPartition()

distinctRDD.foreachPartition(x=>{

println("创建数据库连接")

while(x.hasNext){

println("拼接sql"+x.next())

}

println("执行sql")

})

//===========================jion=======================================//

//jion:内连接

val scoreRDD=sc.parallelize(List(

(1,100),

(2,90),

(3,84),

(4,69)

),2)

//本地集合获取得到RDD:

val subjectRDD=sc.makeRDD(Array(

// (1,"math"),

(2,"english"),

(3,"chinese"),

(4,"physics")

),3)

//测试jion后的分区数:jionedRDD的分区是由父RDD中最多的分区数来决定的

println("Jion后的partition="+subjectRDD.partitions.length)

//jion左边的为主表:主表的数据做为标准连接

val jionRDD=subjectRDD.join(scoreRDD)

jionRDD.foreach(

x=>{

val id=x._1

val subject=x._2._1

val score=x._2._2

println("id:"+id+" subject:"+subject+" score:"+score)

}

)

//leftOuterJoin左边的为主表:主表的数据做为标准连接,没连接上的为none

//rightOuterJoin右边的为主表:主表的数据做为标准连接,没连接上的为none

//fullouterJion全部连接,没连接上的为none

val leftOuterJoinRDD=subjectRDD.leftOuterJoin(scoreRDD)

leftOuterJoinRDD.foreach(

x=>{

val id=x._1

val subject=x._2._1

val score=x._2._2

println("id:"+id+" subject:"+subject+" score:"+score)

}

)

//===========================union=======================================//

//union:union后的分区数是union RDD的累加和

val unionRDD1=sc.parallelize(1 to 10,1)

val unionRDD2=sc.parallelize(1 until 20,2)

val unionRDD=unionRDD1.union(unionRDD2)

println("union后的分区数="+unionRDD.partitions.length)

unionRDD.foreach(println)

}

}

源码下载地址:链接:http://pan.baidu.com/s/1nuTS8WT
密码:9bf7

总结:

distinct:

distinct源码解析:

RDD类中:



jion:针对(K,V)的RDD的操作

union:注意:union关联的两个RDD必须类型一致

foreachPartition:foreachPartition()遍历的单位是partition,类似的

jionedRDD的分区是由父RDD中最多的分区数来决定的

如需形象的理解算子请参考博客:

图解算子:http://www.cnblogs.com/liuzhongfeng/p/5285613.html

思维导图构建你的知识架构:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: