您的位置:首页 > 运维架构 > Apache

【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战010--DateSet实用API详解010

2017-11-16 09:57 981 查看

DateSet的API详解十

leftOuterJoin<
4000
/h2>

def leftOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

An outer join on the left side.

左外连接。


[title3]leftOuterJoin示例一[/title3]

执行程序:

//1.定义case class
case class Rating(name: String, category: String, points: Int)

//2.定义 DataSet[Rating]
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5),Rating("tiger","youny4",5))

//3.定义DataSet[(String, String)]
val movies: DataSet[(String, String)]  = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"),("water","nice"))

//4.两个dataset进行左外连接,指定方法
val result1 = movies.leftOuterJoin(ratings).where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}

//5.显示结果
result1.collect


执行结果:

res26: Seq[(String, Int)] = Buffer((moon,3), (dog,5), (cat,1), (sun,4), (water,-1))


web ui中的执行效果:



[title3]leftOuterJoin示例二[/title3]

执行程序:

//1.定义case class
case class Rating(name: String, category: String, points: Int)

//2.定义 DataSet[Rating]
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5),Rating("tiger","youny4",5))

//3.定义DataSet[(String, String)]
val movies: DataSet[(String, String)]  = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"),("water","nice"))

//4.两个dataset进行左外连接,指定连接暗示,并指定连接方法
val result1 = movies.leftOuterJoin(ratings, JoinHint.REPARTITION_SORT_MERGE)
.where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}

//5.显示结果
result1 .collect


执行结果:

res26: Seq[(String, Int)] = Buffer((cat,1), (dog,5), (moon,3), (sun,4), (water,-1))


暗示项目说明:

左外连接支持以下项目:
JoinHint.OPTIMIZER_CHOOSES
JoinHint.BROADCAST_HASH_SECOND
JoinHint.REPARTITION_HASH_SECOND
JoinHint.REPARTITION_SORT_MERGE


rightOuterJoin

def rightOuterJoin[O](other: DataSet[O], strategy: JoinHint): UnfinishedOuterJoinOperation[T, O]
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]

An outer join on the right side.

右外连接


rightOuterJoin示例一

执行程序:

//1.定义DataSet[(String, String)]
val movies: DataSet[(String, String)]  = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"))

//2.定义 DataSet[Rating]
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5))

//3.两个dataset进行左外连接,指定连接方法
val result1 = movies.rightOuterJoin(ratings).where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}

//5.显示结果
result1.collect


执行结果:

res33: Seq[(String, Int)] = Buffer((moon,3), (sun,4), (cat,1), (dog,5))


web ui中的执行效果:



rightOuterJoin示例二

执行程序:

//1.定义DataSet[(String, String)]
val movies: DataSet[(String, String)]  = benv.fromElements(
("moon","ok"),("dog","good"),
("cat","notbad"),("sun","nice"))

//2.定义 DataSet[Rating]
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5))

//3.两个dataset进行左外连接,暗示连接方式,指定连接方法
val result1 = movies.rightOuterJoin(ratings,JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo("name"){
(m, r) => (m._1, if (r == null) -1 else r.points)
}

//5.显示结果
result1.collect


执行结果:

res34: Seq[(String, Int)] = Buffer((moon,3), (sun,4), (cat,1), (dog,5))


暗示项目说明:

左外连接支持以下项目:
JoinHint.OPTIMIZER_CHOOSES
JoinHint.BROADCAST_HASH_FIRST
JoinHint.REPARTITION_HASH_FIRST
JoinHint.REPARTITION_SORT_MERGE
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐