【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战023--DateSet实用API详解023
2017-11-17 18:03
851 查看
Flink DateSet定制API详解(Scala版) -004
Join
join将两个DataSet按照一定的关联度进行类似SQL中的Join操作。
执行程序:
package code.book.batch.dataset.advance.api import org.apache.flink.api.scala.{ExecutionEnvironment, _} object JoinFunction001scala { def main(args: Array[String]): Unit = { // 1.设置运行环境,并创造测试数据 val env = ExecutionEnvironment.getExecutionEnvironment val authors = env.fromElements( Tuple3("A001", "zhangsan", "zhangsan@qq.com"), Tuple3("A001", "lisi", "lisi@qq.com"), Tuple3("A001", "wangwu", "wangwu@qq.com")) val posts = env.fromElements( Tuple2("P001", "zhangsan"), Tuple2("P002", "lisi"), Tuple2("P003", c64c "wangwu"), Tuple2("P004", "lisi")) // 2.scala中没有with方法来使用JoinFunction val text2 = authors.join(posts).where(1).equalTo(1) //3.显示结果 text2.print() } }
执行结果:
text2.print() ((A001,wangwu,wangwu@qq.com),(P003,wangwu)) ((A001,zhangsan,zhangsan@qq.com),(P001,zhangsan)) ((A001,lisi,lisi@qq.com),(P002,lisi)) ((A001,lisi,lisi@qq.com),(P004,lisi))
CoGroup
将2个DataSet中的元素,按照key进行分组,一起分组2个DataSet。而groupBy值能分组一个DataSet
执行程序:
package code.book.batch.dataset.advance.api import org.apache.flink.api.scala.{ExecutionEnvironment, _} object CoGroupFunction001scala { def main(args: Array[String]): Unit = { // 1.设置运行环境,并创造测试数据 val env = ExecutionEnvironment.getExecutionEnvironment val authors = env.fromElements( Tuple3("A001", "zhangsan", "zhangsan@qq.com"), Tuple3("A001", "lisi", "lisi@qq.com"), Tuple3("A001", "wangwu", "wangwu@qq.com")) val posts = env.fromElements( Tuple2("P001", "zhangsan"), Tuple2("P002", "lisi"), Tuple2("P003", "wangwu"), Tuple2("P004", "lisi")) // 2.scala中coGroup没有with方法来使用CoGroupFunction val text2 = authors.coGroup(posts).where(1).equalTo(1) //3.显示结果 text2.print() } }
执行结果:
text2.print() ([Lscala.Tuple3;@6c2c1385,[Lscala.Tuple2;@5f354bcf) ([Lscala.Tuple3;@3daf7722,[Lscala.Tuple2;@78641d23) ([Lscala.Tuple3;@74589991,[Lscala.Tuple2;@146dfe6)
相关文章推荐
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战001--DateSet实用API详解001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战010--DateSet实用API详解010
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战012--DateSet实用API详解012
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战019--DateSet实用API详解019
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战027--DateSet实用API详解027
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战016--DateSet实用API详解016
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战013--DateSet实用API详解013
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战020--DateSet实用API详解020
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战004--DateSet实用API详解004
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战017--DateSet实用API详解017
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战014--DateSet实用API详解014
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战007--DateSet实用API详解007
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战006--DateSet实用API详解006
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战008--DateSet实用API详解008
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战005--DateSet实用API详解005
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战024--DateSet实用API详解024
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战018--DateSet实用API详解018
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战026--DateSet实用API详解026