您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第71课:Spark SQL窗口函数解密与实战

2017-03-09 23:27 260 查看

Spark SQL窗口函数解密与实战

1、Spark SQL窗口函数解析

2、Spark SQL窗口函数实战

/**
* Scala代码
*/
package com.tom.spark.sql

import org.apache.spark.sql.DataFrame
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object SparkSQLWindowFunctionOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkSQLWindowFunctionOps")
val sc = new SparkContext(conf)

val hiveContext = new HiveContext(sc)
hiveContext.sql("use hive") //使用名称为hive的数据库,接下来所有的表操作都位于这个库
/**
* 如果要创建的表存在的话就删除,然后创建我们要导入数据的表
*/
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INTEGER)"
+ "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n'")
//把要处理的数据导入到Hive的表中
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/Documents/SparkApps/resources/topNGroup.txt' INTO TABLE scores")

/**
* 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序
* PARTITION BY:指定窗口函数分组的Key;
* ORDER BY:分组后进行排序;
*
*/
val result = hiveContext.sql("SELECT name, score FROM (" +
"SELECT name, score, row_number() OVER (PARTITION BY name ORDER BY score DESC) rank FROM scores" +
") sub_scores " +
"WHERE rank<=4")

result.show()

//把数据保存到Hive数据仓库中
hiveContext.sql("DROP TABLE EXISTS sortedResultScores")
result.saveAsTable("sortedResultScores")
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐