【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战005--DateSet实用API详解005
2017-11-16 09:08
1156 查看
DateSet的API详解五
groupBy
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T] def groupBy(fields: Int*): GroupedDataSet[T] def groupBy[K](fun: (T) ⇒ K)(implicit arg0: TypeInformation[K]): GroupedDataSet[T] Creates a GroupedDataSet which provides operations on groups of elements. 暗示第二个输入较小的交叉。 拿第一个输入的每一个元素和第二个输入的每一个元素进行交叉操作。
groupBy示例一:使用一个Case Class Fields
执行程序://1.定义 class case class WC(val word: String, val salary: Int) //2.定义DataSet[WC] val words: DataSet[WC] = benv.fromElements( WC("LISI",600),WC("LISI",400),WC("WANGWU",300),WC("ZHAOLIU",700)) //3.使用自定义的reduce方法,使用key-expressions val wordCounts1 = words.groupBy("word").reduce { (w1, w2) => new WC(w1.word, w1.salary + w2.salary) } //4.使用自定义的reduce方法,使用key-selector val wordCounts2 = words.groupBy { _.word } reduce { (w1, w2) => new WC(w1.word, w1.salary + w2.salary) } //5.显示结果 wordCounts1.collect wordCounts2.collect
执行结果:
Scala-Flink> wordCounts1.collect res5: Seq[WC] = Buffer(WC(LISI,1000), WC(WANGWU,300), WC(ZHAOLIU,700)) Scala-Flink> wordCounts1.collec2 res6: Seq[WC] = Buffer(WC(LISI,1000), WC(WANGWU,300), WC(ZHAOLIU,700))
web ui中的执行效果:
groupBy示例二:使用多个Case Class Fields
执行程序://1.定义 case class case class Student(val name: String, addr: String, salary: Double) //2.定义DataSet[Student] val tuples:DataSet[Student] = benv.fromElements( Student("lisi","shandong",2400.00),Student("zhangsan","henan",2600.00), Student("lisi","shandong",2700.00),Student("lisi","guangdong",2800.00)) //3.使用自定义的reduce方法,使用多个Case Class Fields name val reducedTuples1 = tuples.groupBy("name", "addr").reduce { (s1, s2) => Student(s1.name+"-"+s2.name,s1.addr+"-"+s2.addr,s1.salary+s2.salary) } //4.使用自定义的reduce方法,使用多个Case Class Fields index val reducedTuples2 = tuples.groupBy(0, 1).reduce { (s1, s2) => Student(s1.name+"-"+s2.name,s1.addr+"-"+s2.addr,s1.salary+s2.salary) } //5.使用自定义的reduce方法,name和index混用 val reducedTuples3 = tuples.groupBy(0, 1).reduce { (s1, s2) => Student(s1.name+"-"+s2.name,s1.addr+"-"+s2.addr,s1.salary+s2.salary) } //6.显示结果 reducedTuples1.collect reducedTuples2.collect reducedTuples3.collect
执行结果:
Scala-Flink> reducedTuples1.collect res96: Seq[Student] = Buffer( Student(lisi,guangdong,2800.0), Student(lisi-lisi,shandong-shandong,5100.0), Student(zhangsan,henan,2600.0)) Scala-Flink> reducedTuples2.collect res97: Seq[Student] = Buffer( Student(lisi,guangdong,2800.0), Student(lisi-lisi,shandong-shandong,5100.0), Student(zhangsan,henan,2600.0)) Scala-Flink> reducedTuples3.collect res98: Seq[Student] = Buffer( Student(lisi,guangdong,2800.0), Student(lisi-lisi,shandong-shandong,5100.0), Student(zhangsan,henan,2600.0))
web ui中的执行效果:
相关文章推荐
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战007--DateSet实用API详解007
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战006--DateSet实用API详解006
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战008--DateSet实用API详解008
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战023--DateSet实用API详解023
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战018--DateSet实用API详解018
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战024--DateSet实用API详解024
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战010--DateSet实用API详解010
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战026--DateSet实用API详解026
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战001--DateSet实用API详解001
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战012--DateSet实用API详解012
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战019--DateSet实用API详解019
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战027--DateSet实用API详解027
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战016--DateSet实用API详解016
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战013--DateSet实用API详解013
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战020--DateSet实用API详解020
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战004--DateSet实用API详解004
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战017--DateSet实用API详解017
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战014--DateSet实用API详解014