【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战010--DateSet实用API详解010
2017-11-16 09:57
981 查看
DateSet的API详解十
leftOuterJoin<
4000
/h2>
def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
An outer join on the left side.
左外连接。
[title3]leftOuterJoin示例一[/title3]
执行程序:
//1.定义case class
case class Rating(name: String, category: String, points: Int)
//2.定义 DataSet[Rating]
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5),Rating("tiger","youny4",5))
//3.定义DataSet[(String, String)]
val movies: DataSet[(String, String)] = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"),("water","nice"))
//4.两个dataset进行左外连接,指定方法
val result1 = movies.leftOuterJoin(ratings).where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}
//5.显示结果
result1.collect
执行结果:
res26: Seq[(String, Int)] = Buffer((moon,3), (dog,5), (cat,1), (sun,4), (water,-1))
web ui中的执行效果:
![](https://oscdn.geek-share.com/Uploads/Images/Content/201711/ede8af3481f44010a7042a5e82f28efa)
[title3]leftOuterJoin示例二[/title3]
执行程序:
//1.定义case class
case class Rating(name: String, category: String, points: Int)
//2.定义 DataSet[Rating]
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5),Rating("tiger","youny4",5))
//3.定义DataSet[(String, String)]
val movies: DataSet[(String, String)] = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"),("water","nice"))
//4.两个dataset进行左外连接,指定连接暗示,并指定连接方法
val result1 = movies.leftOuterJoin(ratings, JoinHint.REPARTITION_SORT_MERGE)
.where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}
//5.显示结果
result1 .collect
执行结果:
res26: Seq[(String, Int)] = Buffer((cat,1), (dog,5), (moon,3), (sun,4), (water,-1))
暗示项目说明:
左外连接支持以下项目:
JoinHint.OPTIMIZER_CHOOSES
JoinHint.BROADCAST_HASH_SECOND
JoinHint.REPARTITION_HASH_SECOND
JoinHint.REPARTITION_SORT_MERGE
rightOuterJoin
def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O] def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O] An outer join on the right side. 右外连接
rightOuterJoin示例一
执行程序://1.定义DataSet[(String, String)] val movies: DataSet[(String, String)] = benv.fromElements( ("moon","ok"),("dog","good"), ("cat","notbad"),("sun","nice")) //2.定义 DataSet[Rating] case class Rating(name: String, category: String, points: Int) val ratings: DataSet[Rating] = benv.fromElements( Rating("moon","youny1",3),Rating("sun","youny2",4), Rating("cat","youny3",1),Rating("dog","youny4",5)) //3.两个dataset进行左外连接,指定连接方法 val result1 = movies.rightOuterJoin(ratings).where(0).equalTo("name"){ (m, r) => (m._1, if (r == null) -1 else r.points) } //5.显示结果 result1.collect
执行结果:
res33: Seq[(String, Int)] = Buffer((moon,3), (sun,4), (cat,1), (dog,5))
web ui中的执行效果:
rightOuterJoin示例二
执行程序://1.定义DataSet[(String, String)] val movies: DataSet[(String, String)] = benv.fromElements( ("moon","ok"),("dog","good"), ("cat","notbad"),("sun","nice")) //2.定义 DataSet[Rating] case class Rating(name: String, category: String, points: Int) val ratings: DataSet[Rating] = benv.fromElements( Rating("moon","youny1",3),Rating("sun","youny2",4), Rating("cat","youny3",1),Rating("dog","youny4",5)) //3.两个dataset进行左外连接,暗示连接方式,指定连接方法 val result1 = movies.rightOuterJoin(ratings,JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo("name"){ (m, r) => (m._1, if (r == null) -1 else r.points) } //5.显示结果 result1.collect
执行结果:
res34: Seq[(String, Int)] = Buffer((moon,3), (sun,4), (cat,1), (dog,5))
暗示项目说明:
左外连接支持以下项目: JoinHint.OPTIMIZER_CHOOSES JoinHint.BROADCAST_HASH_FIRST JoinHint.REPARTITION_HASH_FIRST JoinHint.REPARTITION_SORT_MERGE
相关文章推荐
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战007--DateSet实用API详解007
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战006--DateSet实用API详解006
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战008--DateSet实用API详解008
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战023--DateSet实用API详解023
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战005--DateSet实用API详解005
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战018--DateSet实用API详解018
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战024--DateSet实用API详解024
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战026--DateSet实用API详解026
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战001--DateSet实用API详解001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战012--DateSet实用API详解012
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战019--DateSet实用API详解019
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战027--DateSet实用API详解027
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战016--DateSet实用API详解016
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战013--DateSet实用API详解013
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战020--DateSet实用API详解020
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战004--DateSet实用API详解004
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战017--DateSet实用API详解017
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战014--DateSet实用API详解014