您的位置:首页 > 运维架构 > Apache

【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战002--DateSet实用API详解002

2017-11-15 13:35 1096 查看

DateSet的API详解二

min

def min(field: Int): AggregateDataSet[T]
def min(field: String): AggregateDataSet[T]

Syntactic sugar for aggregate with MIN

获取最小的元素


执行程序:

//1.创建DataSet[Student]
case class Student(age: Int, name: String,height:Double)
val input: DataSet[Student] = benv.fromElements(
Student(16,"zhangasn",194.5),
Student(17,"zhangasn",184.5),
Student(18,"zhangasn",174.5),
Student(16,"lisi",194.5),
Student(17,"lisi",184.5),
Student(18,"lisi",174.5))

d1ad
//2.获取age最小的元素
val output0: DataSet[Student] = input.min(0)
output0.collect

//3.获取age最小的元素
val output1: DataSet[Student] = input.min("age")
output1.collect


执行结果:

Scala-Flink> output0.collect
res78: Seq[Student] = Buffer(Student(16,lisi,174.5))

Scala-Flink> output1.collect
res79: Seq[Student] = Buffer(Student(16,lisi,174.5))


max

def max(field: String): AggregateDataSet[T]
def max(field: Int): AggregateDataSet[T]

Syntactic sugar for aggregate with MAX

获取最大的元素


执行程序:

//1.创建DataSet[Student]
case class Student(age: Int, name: String,height:Double)
val input: DataSet[Student] = benv.fromElements(
Student(16,"zhangasn",194.5),
Student(17,"zhangasn",184.5),
Student(18,"zhangasn",174.5),
Student(16,"lisi",194.5),
Student(17,"lisi",184.5),
Student(18,"lisi",174.5))

//2.获取age最小的元素
val output0: DataSet[Student] = input.max(0)
output0.collect

//3.获取age最小的元素
val output1: DataSet[Student] = input.max("age")
output1.collect


执行结果:

Scala-Flink> output0.collect
res79: Seq[Student] = Buffer(Student(18,lisi,174.5))

Scala-Flink> output1.collect
res79: Seq[Student] = Buffer(Student(18,lisi,174.5))


sum

def sum(field: String): AggregateDataSet[T]
def sum(field: Int): AggregateDataSet[T]

Syntactic sugar for aggregate with SUM

获取元素的累加和,只能作用于数值类型


执行程序:

//1.创建 DataSet[Student]
case class Student(age: Int, name: String,height:Double)
val input: DataSet[Student] = benv.fromElements(
Student(16,"zhangasn",194.5),
Student(17,"zhangasn",184.5),
Student(18,"zhangasn",174.5),
Student(16,"lisi",194.5),
Student(17,"lisi",184.5),
Student(18,"lisi",174.5))

//2.fieldIndex=0的列进行sum
val s0=input.sum(0)
s0.collect

//3.fieldName="age"的列进行sum
val s1=input.sum("age")
s1.collect

//4.fieldName="height"的列进行sum
val s2=input.sum("height")
s2.collect


执行结果:

Scala-Flink> s0.collect
res111: Seq[Student] = Buffer(Student(102,lisi,174.5))

Scala-Flink> s1.collect
res112: Seq[Student] = Buffer(Student(102,lisi,174.5))

Scala-Flink> s2.collect
res113: Seq[Student] = Buffer(Student(18,lisi,1107.0))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐