第71课:Spark SQL窗口函数解密与实战
2016-05-23 00:00
260 查看
摘要: Spark 学习
伯克利官网介绍:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.htmlUsing Window Functions
Spark SQL supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions. The available ranking functions and analytic functions are summarized in the table below. For aggregate functions, users can use any existing aggregate function as a window function.SQL | DataFrame API | |
Ranking functions | rank | rank |
dense_rank | denseRank | |
percent_rank | percentRank | |
ntile | ntile | |
row_number | rowNumber | |
Analytic functions | cume_dist | cumeDist |
first_value | firstValue | |
last_value | lastValue | |
lag | lag | |
lead | lead |
相关代码如下:
[code=language-scala]import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} /** * Created by Bindy on 16-5-13. * * @author DT大数据梦工厂-学员 * */ object s71_SparkSQLWindowFunctionOPS { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("SparkSQLWindowFunction") .setMaster("spark://cloud001:7077") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val hiveContext = new HiveContext(sc) /** * 如果要创建的表存在的话就删除,然后创建我们要导入数据的表 */ hiveContext.sql("use default") hiveContext.sql("DROP TABLE IF EXISTS scores") //删除同名的Table hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING, score INT) " + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n'") //创建自定义的Table hiveContext.sql("LOAD DATA LOCAL INPATH '/home/hduser/IMF_Study/testData/topNGroup.txt' " + "INTO TABLE scores") /** * 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序: * partition by:指定窗口函数分组的key; * order by:分组后进行排序; */ val result = hiveContext.sql("select name,score " + "from (" + "select " + "name," + "score," + "row_number() over (partition by name order by score desc) rank " + "from scores" + ") sub_scores " + "where rank <=4") result.show() //把数据保存到Hive数据库中 hiveContext.sql("DROP TABLE IF EXISTS sortResultScores") //删除同名的Table result.write.saveAsTable("sortResultScores") } }
相关文章推荐
- 详解HDFS Short Circuit Local Reads
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- Windows下Scala环境搭建
- Spark随谈——开发指南(译)
- 单机版搭建Hadoop环境图文教程详解
- Spark,一种快速数据分析替代方案
- hadoop常见错误以及处理方法详解
- Windows7下安装Scala 2.9.2教程
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- 用PHP和Shell写Hadoop的MapReduce程序