第70课:Spark SQL内置函数解密于实战
2016-05-23 00:00
204 查看
摘要: Spark 学习
1 Spark SQL内置函数解析
2 Spark SQL内置函数实战
说明:Spark操作Hive上的数据叫做Spark on Hive;而Hive on Spark是Hive的子项目,依旧是以Hive为核心,但是引擎由MapRuduce换成了Spark,现在发展不成熟,但是它有远大前途。
代码和注释如下:
1 Spark SQL内置函数解析
2 Spark SQL内置函数实战
说明:Spark操作Hive上的数据叫做Spark on Hive;而Hive on Spark是Hive的子项目,依旧是以Hive为核心,但是引擎由MapRuduce换成了Spark,现在发展不成熟,但是它有远大前途。
代码和注释如下:
/** * Created by Bindy on 16-5-13. * * @author DT大数据梦工厂-学员 * 使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不同的是,DataFrame中的内置函数操作 * 的结果是返回一个Column对象,而DataFrame天生就是“A distributed collection of data organized into named columns.”, * 这就为数据的复杂分析建立了坚实的基础并提供了极大的方便性,例如说,我们在操作DataFrame的方法中可以随时调用内置函数进行业务需要 * 的处理,这之于我们构建复杂的业务逻辑而言可以极大的减少不必要的时间消耗,让我们聚焦在数据分析上,这对于提高工程师的生产力而言是非常 * 有价值的,Spark 1.5.x开始提供了大量的内置函数,例如agg: * def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { * groupBy().agg(aggExpr, aggExprs : _*) * } * 还有max、min、sum、avg、explode、size、sort_array、day、to_date、abs、acros、asin、atan * 总体上而言内置函数包含了五大基本类型: * 1、聚合函数,例如countDistinct、sumDistinct等; * 2、集合函数,例如sort_array、explode等; * 3、日期、时间函数,例如hour、quarter、next_day; * 4、数学函数:例如asin、atan、sqrt、tan、round等; * 5、开窗函数:例如row_number等; * 6、字符串函数:concat、format_number、rexexp_extract * 7、其他函数:isNan、sha、randn、callUDF */ object s70_SparkSQLAgg { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("SparkSQLInnerFunctions") .setMaster("local") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val sqlContext = new SQLContext(sc) //构建SQL上下文 //要使用Spark SQL的内置函数,就一定要导入SQLContext下的隐式转换 import sqlContext.implicits._ /** * 模拟电商访问的数据,实际情况会比模拟数据复杂很多,最后生成RDD */ val userData = Array( "2016-3-27,001,http://spark.apache.org/,1000", "2016-3-27,001,http://hadoop.apache.org/,2000", "2016-3-28,004,http://flink.apache.org/,888", "2016-3-28,003,http://kafka.apache.org/,999", "2016-3-28,002,http://hive.apache.org/,77", "2016-3-28,004,http://parquet.apache.org/,66", "2016-3-28,003,http://spark.apache.org/,666" ) val userDataRDD = sc.parallelize(userData) //生成RDD分布式集合对象 /** * 根据业务需要对数据进行预处理生成DataFrame,要想把RDD转换成DataFrame,需要先把RDD中的元素类型变成Row类型 * 与此同时要提供DataFrame中的Columns的元数据信息描述 */ val userDataRDDRow = userDataRDD.map(row => { val splited = row.split(","); Row(splited(0), splited(1).toInt, splited(2), splited(3).toInt) }) val structTypes = StructType(Array( StructField("time", StringType, true), StructField("id", IntegerType, true), StructField("url", StringType, true), StructField("amount", IntegerType, true) )) val userDataDF = sqlContext.createDataFrame(userDataRDDRow, structTypes) /** * 使用Spark SQL提供的内置函数对DataFrame进行操作,特别注意:内置函数生成的Column对象且自定进行CG; */ userDataDF .groupBy("time") .agg('time, countDistinct('id)) .map(row => Row(row(1), row(2))) .collect() .foreach(println) userDataDF.groupBy("time").agg('time, sum('amount)).show() } }
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- 单机版搭建Hadoop环境图文教程详解
- Spark,一种快速数据分析替代方案
- hadoop常见错误以及处理方法详解
- SQLite教程(四):内置函数
- Windows7下安装Scala 2.9.2教程
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- Python内置函数bin() oct()等实现进制转换
- Python标准库内置函数complex介绍
- hadoop实现grep示例分享
- 几个实用的PHP内置函数使用指南
- Lua所有内置函数罗列
- awk正则表达式和内置函数的使用方法实例详解