Spark的Dataset操作(五)-多表操作 join
2017-07-21 06:49
323 查看
Spark的Dataset操作(五)-多表操作 join
不说废话了,直接上代码。先看两个源数据表的定义:
scala> val df1 = spark.createDataset(Seq(("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6)) ).toDF("key1","key2","key3") df1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field] scala> val df2 = spark.createDataset(Seq(("aaa", 2, 2), ("bbb", 3, 5), ("ddd", 3, 5), ("bbb", 4, 6), ("eee", 1, 2), ("aaa", 1, 5), ("fff",5,6))).toDF("key1","key2","key4") df2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field] scala> df1.printSchema root |-- key1: string (nullable = true) |-- key2: integer (nullable = false) |-- key3: integer (nullable = false) scala> df2.printSchema root |-- key1: string (nullable = true) |-- key2: integer (nullable = false) |-- key4: integer (nullable = false) scala> df1.show() +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | ccc| 3| 5| | bbb| 4| 6| +----+----+----+ scala> df2.show() +----+----+----+ |key1|key2|key4| +----+----+----+ | aaa| 2| 2| | bbb| 3| 5| | ddd| 3| 5| | bbb| 4| 6| | eee| 1| 2| | aaa| 1| 5| | fff| 5| 6| +----+----+----+
Spark对join的支持很丰富,等值连接,条件连接,自然连接都支持。连接类型包括内连接,外连接,左外连接,右外连接,左半连接以及笛卡尔连接。
下面一一示例,先看内连接
/* 内连接 select * from df1 join df2 on df1.key1=df2.key1 */ scala> val df3 = df1.join(df2,"key1") df3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields] scala> df3.printSchema root |-- key1: string (nullable = true) |-- key2: integer (nullable = false) |-- key3: integer (nullable = false) |-- key2: integer (nullable = false) |-- key4: integer (nullable = false) scala> df3.show +----+----+----+----+----+ |key1|key2|key3|key2|key4| +----+----+----+----+----+ | aaa| 1| 2| 1| 5| | aaa| 1| 2| 2| 2| | bbb| 3| 4| 4| 6| | bbb| 3| 4| 3| 5| | bbb| 4| 6| 4| 6| | bbb| 4| 6| 3| 5| +----+----+----+----+----+ /* 还是内连接,这次用joinWith。和join的区别是连接后的新Dataset的schema会不一样,注意和上面的对比一下。 */ scala> val df4=df1.joinWith(df2,df1("key1")===df2("key1")) df4: org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = [_1: struct<key1: string, key2: int ... 1 more field>, _2: struct<key1: string, key2: int ... 1 more field>] scala> df4.printSchema root |-- _1: struct (nullable = false) | |-- key1: string (nullable = true) | |-- key2: integer (nullable = false) | |-- key3: integer (nullable = false) |-- _2: struct (nullable = false) | |-- key1: string (nullable = true) | |-- key2: integer (nullable = false) | |-- key4: integer (nullable = false) scala> df4.show +---------+---------+ | _1| _2| +---------+---------+ |[aaa,1,2]|[aaa,1,5]| |[aaa,1,2]|[aaa,2,2]| |[bbb,3,4]|[bbb,4,6]| |[bbb,3,4]|[bbb,3,5]| |[bbb,4,6]|[bbb,4,6]| |[bbb,4,6]|[bbb,3,5]| +---------+---------+
然后是外连接:
/* select * from df1 outer join df2 on df1.key1=df2.key1 */ scala> val df5 = df1.join(df2,df1("key1")===df2("key1"), "outer") df5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields] scala> df5.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ |null|null|null| ddd| 3| 5| | ccc| 3| 5|null|null|null| | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| aaa| 1| 5| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| | bbb| 4| 6| bbb| 4| 6| |null|null|null| fff| 5| 6| |null|null|null| eee| 1| 2| +----+----+----+----+----+----+
下面是左外连接,右外连接和左半连接:
/* 左外连接 */ scala> val df6 = df1.join(df2,df1("key1")===df2("key1"), "left_outer") df6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields] scala> df6.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| aaa| 2| 2| | bbb| 3| 4| bbb| 4| 6| | bbb| 3| 4| bbb| 3| 5| | ccc| 3| 5|null|null|null| | bbb| 4| 6| bbb| 4| 6| | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+ /* 右外连接 */ scala> val df7 = df1.join(df2,df1("key1")===df2("key1"), "right_outer") df7: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields] scala> df7.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | bbb| 4| 6| bbb| 3| 5| | bbb| 3| 4| bbb| 3| 5| |null|null|null| ddd| 3| 5| | bbb| 4| 6| bbb| 4| 6| | bbb| 3| 4| bbb| 4| 6| |null|null|null| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| |null|null|null| fff| 5| 6| +----+----+----+----+----+----+ /* 左半连接 */ scala> val df8 = df1.join(df2,df1("key1")===df2("key1"), "leftsemi") df8: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field] scala> df8.show +----+----+----+ |key1|key2|key3| +----+----+----+ | aaa| 1| 2| | bbb| 3| 4| | bbb| 4| 6| +----+----+----+
笛卡尔连接不太常用,毕竟现在用spark玩的表都大得很,做这种全连接成本太大了。
/* 笛卡尔连接 */ scala> val df9 = df1.crossJoin(df2) df9: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields] scala> df9.count res17: Long = 28 /* 就显示前10条结果吧 */ scala> df9.show(10) +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | aaa| 1| 2| aaa| 2| 2| | aaa| 1| 2| bbb| 3| 5| | aaa| 1| 2| ddd| 3| 5| | aaa| 1| 2| bbb| 4| 6| | aaa| 1| 2| eee| 1| 2| | aaa| 1| 2| aaa| 1| 5| | aaa| 1| 2| fff| 5| 6| | bbb| 3| 4| aaa| 2| 2| | bbb| 3| 4| bbb| 3| 5| | bbb| 3| 4| ddd| 3| 5| +----+----+----+----+----+----+ only showing top 10 rows
下面这个例子还是个等值连接,区别之前的等值连接是去调用两个表的重复列,就像自然连接一样:
/* 基于两个公共字段key1和key的等值连接 */ scala> val df10 = df1.join(df2, Seq("key1","key2")) df10: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields] scala> df10.show +----+----+----+----+ |key1|key2|key3|key4| +----+----+----+----+ | aaa| 1| 2| 5| | bbb| 3| 4| 5| | bbb| 4| 6| 6| +----+----+----+----+
条件连接在spark的低版本好像是不支持的,反正现在是ok啦~
/* select df1.*,df2.* from df1 join df2 on df1.key1=df2.key1 and df1.key2>df2.key2 */ scala> val df11 = df1.join(df2, df1("key1")===df2("key1") && df1("key2")>df2("key2")) df11: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields] scala> df11.show +----+----+----+----+----+----+ |key1|key2|key3|key1|key2|key4| +----+----+----+----+----+----+ | bbb| 4| 6| bbb| 3| 5| +----+----+----+----+----+----+
相关文章推荐
- Spark算子:RDD行动Action操作(6)–saveAsHadoopFile、saveAsHadoopDataset
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决
- Spark RDD转换操作union、join、cogroup
- spark之dataset基本操作
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark的Dataset操作(四)-其他单表操作
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- spark-DataFrame学习记录-[2]解决spark-dataframe的JOIN操作之后产生重复列(Reference '***' is ambiguous问题解决)
- Spark的Dataset操作(二)-过滤的filter和where
- Spark的Dataset操作(三)-分组,聚合,排序
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark的Dataset操作(一)-列的选择select
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark算子:RDD行动Action操作(7)–saveAsNewAPIHadoopFile、saveAsNewAPIHadoopDataset
- Spark算子:RDD键值转换操作(4)–cogroup、join
- 3.3 Spark RDD键值转换操作5-leftOuterJoin、rightOuterJoin、subtractByKey
- Spark2 DataSet聚合操作
- DataSet中进行Join操作
- 离线轻量级大数据平台Spark之JavaRDD关联join操作
- Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset问题的分析与解决