SparkSQL 概述
2015-11-03 15:11
411 查看
博客地址: http://blog.csdn.net/yueqian_zhu/
Base on spark 1.5.1 overview
一、入口:
SQLContext默认解析器为"sql",用于解析sql语句。
除了SQLContext之外,还有HiveContext,用了更加完善的解析器,默认解析器为
二、创建DataFrames
目前支持从RDD、hive表以及其它数据源中创建DataFrames。详见下一篇介绍
1、show:打印
2、printSchema:打印schema信息
3、select: 从原始DataFrames中选择部分colums
4、filter:过滤
5、groupBy:分组
6、count:计数
...
详见下一篇介绍
四、运行sql
1、从已知格式中反射出对应的schema信息,使用case classes
2、不使用case classes
六、数据源
1、从parquet格式的文件中加载/输出
支持Hive的spark需要
详见spark 1.5.1 overview: http://spark.apache.org/docs/latest/sql-programming-guide.html
4、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
Base on spark 1.5.1 overview
一、入口:
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
SQLContext默认解析器为"sql",用于解析sql语句。
除了SQLContext之外,还有HiveContext,用了更加完善的解析器,默认解析器为
spark.sql.dialect
="hiveql"
二、创建DataFrames
目前支持从RDD、hive表以及其它数据源中创建DataFrames。详见下一篇介绍
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show()三、DataFrames对外DSL接口
1、show:打印
2、printSchema:打印schema信息
3、select: 从原始DataFrames中选择部分colums
4、filter:过滤
5、groupBy:分组
6、count:计数
...
详见下一篇介绍
四、运行sql
val sqlContext = ... // An existing SQLContext val df = sqlContext.sql("SELECT * FROM table")五、Schema推断
1、从已知格式中反射出对应的schema信息,使用case classes
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
2、不使用case classes
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
六、数据源
1、从parquet格式的文件中加载/输出
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")2、更方便的格式化,而无需像以上一样去解析
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")3、Hive表
支持Hive的spark需要
-Phive和
-Phive-thriftserver编译spark,且需要将lib_managed/jars下的datanucleus相关jar包以及
hive-site.xml放在指定的位置。
详见spark 1.5.1 overview: http://spark.apache.org/docs/latest/sql-programming-guide.html
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
4、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
相关文章推荐
- mysql truncate带有被引用外键的表时报错解决方法
- 读取配置jdbc.properties文件连接数据库
- Redis 常用命令
- oracle查询仅中文使用lengthb()方法
- [MySQL FAQ]系列 — 如何安全地关闭MySQL实例
- redis之内存分配malloc底层实现(转)
- mysql 两台主主复制配置
- windwos下安装和使用redis
- Oracle_替换某字段的前N个字符串
- node.js+mysql增删查改
- [MySQL FAQ]系列 — 使用mysqldump备份时为什么要加上 -q 参数
- mysql创建只读账号
- [MySQL FAQ]系列 — 从MyISAM转到InnoDB需要注意什
- [Windows Server 2012] 安装SQL Server 2012
- [MySQL FAQ]系列 — 如何查看当前最新事务ID
- c#sqlserver数据库连接字符串
- Oracle 12C -- temporal validity
- oracle处理数字函数大全
- MySQL远程访问:错误码1130解决方案
- 1.5 使用Sqoop从HDFS导出数据到MySQL