您的位置:首页 > 数据库

第70课:Spark SQL内置函数解密于实战

2016-05-23 00:00 204 查看
摘要: Spark 学习

1 Spark SQL内置函数解析

2 Spark SQL内置函数实战

说明:Spark操作Hive上的数据叫做Spark on Hive;而Hive on Spark是Hive的子项目,依旧是以Hive为核心,但是引擎由MapRuduce换成了Spark,现在发展不成熟,但是它有远大前途。

代码和注释如下:

/**
* Created by Bindy on 16-5-13.
*
* @author DT大数据梦工厂-学员
*         使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作
*         的结果是返回一个Column对象,而DataFrame天生就是“A distributed collection of data organized into named columns.”,
*         这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要
*         的处理,这之于我们构建复杂的业务逻辑而言可以极大的减少不必要的时间消耗,让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常
*         有价值的,Spark 1.5.x开始提供了大量的内置函数,例如agg:
*         def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
*         groupBy().agg(aggExpr, aggExprs : _*)
*         }
*         还有max、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan
*         总体上而言内置函数包含了五大基本类型:
*         1、聚合函数,例如countDistinct、sumDistinct等;
*         2、集合函数,例如sort_array、explode等;
*         3、日期、时间函数,例如hour、quarter、next_day;
*         4、数学函数:例如asin、atan、sqrt、tan、round等;
*         5、开窗函数:例如row_number等;
*         6、字符串函数:concat、format_number、rexexp_extract
*         7、其他函数:isNan、sha、randn、callUDF
*/
object s70_SparkSQLAgg {

def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("SparkSQLInnerFunctions")
.setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val sqlContext = new SQLContext(sc) //构建SQL上下文

//要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换
import sqlContext.implicits._

/**
* 模拟电商访问的数据,实际情况会比模拟数据复杂很多,最后生成RDD
*/
val userData = Array(
"2016-3-27,001,http://spark.apache.org/,1000",
"2016-3-27,001,http://hadoop.apache.org/,2000",
"2016-3-28,004,http://flink.apache.org/,888",
"2016-3-28,003,http://kafka.apache.org/,999",
"2016-3-28,002,http://hive.apache.org/,77",
"2016-3-28,004,http://parquet.apache.org/,66",
"2016-3-28,003,http://spark.apache.org/,666"
)

val userDataRDD = sc.parallelize(userData) //生成RDD分布式集合对象

/**
* 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型
* 与此同时要提供DataFrame中的Columns的元数据信息描述
*/
val userDataRDDRow = userDataRDD.map(row => {
val splited = row.split(",");
Row(splited(0), splited(1).toInt, splited(2), splited(3).toInt)
})
val structTypes = StructType(Array(
StructField("time", StringType, true),
StructField("id", IntegerType, true),
StructField("url", StringType, true),
StructField("amount", IntegerType, true)
))
val userDataDF = sqlContext.createDataFrame(userDataRDDRow, structTypes)

/**
* 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自定进行CG;
*/
userDataDF
.groupBy("time")
.agg('time, countDistinct('id))
.map(row => Row(row(1), row(2)))
.collect()
.foreach(println)

userDataDF.groupBy("time").agg('time, sum('amount)).show()

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息