SparkSQL 相关API
2015-11-03 20:51
513 查看
博客地址: http://blog.csdn.net/yueqian_zhu/
一、SQLContext.scala中的接口说明
大部分接口都是创建DataFrame
1、构造:SQLContext的构造只需要一个SparkContext参数
2、设置/获取 配置:setConf/getConf
3、isCached/cacheTable/uncacheTable/clearCache:数据缓存相关,提高查询速度,需谨慎防止OOM
4、read:用于从外部数据源读取 //todo,下文有详细介绍
5、删除临时表
6、查看目前有哪些表(表的产生在下文DataFrame的方法中有介绍)
7、创建DataFrame:可通过现成的RDD或者Seq来创建
8、创建DataFrame、外部表
9、创建DataFrame:创建一个只包含一个名为id的列,且在指定区间内的DataFrame
二、SQLContext中隐式转换
1、StringToColumn:将StringContext格式隐式转换成StringToColumn类对象,从而具有$方法将列名转成Column类型
三、DataFrameReader
SQLContext类中的read方法返回一个DataFrameReader,它提供方法从外部获取DataFrame
1、格式化。通过format方法将数据源设置为source参数。默认数据源为parquet。
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
1、为schema注入列名,数量需一致
def groupBy(col1: String, cols: String*): GroupedData
17、获取前n行数据
1、设置保存模式
6、insertInto
7、saveAsTable
8、jdbc
一、SQLContext.scala中的接口说明
大部分接口都是创建DataFrame
1、构造:SQLContext的构造只需要一个SparkContext参数
2、设置/获取 配置:setConf/getConf
3、isCached/cacheTable/uncacheTable/clearCache:数据缓存相关,提高查询速度,需谨慎防止OOM
4、read:用于从外部数据源读取 //todo,下文有详细介绍
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. * {{{ * sqlContext.read.parquet("/path/to/file.parquet") * sqlContext.read.schema(schema).json("/path/to/file.json") * }}} def read: DataFrameReader = new DataFrameReader(this)
5、删除临时表
def dropTempTable(tableName: String): Unit
6、查看目前有哪些表(表的产生在下文DataFrame的方法中有介绍)
def tableNames(): Array[String] def tableNames(databaseName: String): Array[String]
7、创建DataFrame:可通过现成的RDD或者Seq来创建
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
* Example: * {{{ * import org.apache.spark.sql._ * import org.apache.spark.sql.types._ * val sqlContext = new org.apache.spark.sql.SQLContext(sc) * * val schema = * StructType( * StructField("name", StringType, false) :: * StructField("age", IntegerType, true) :: Nil) * * val people = * sc.textFile("examples/src/main/resources/people.txt").map( * _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) * val dataFrame = sqlContext.createDataFrame(people, schema) * dataFrame.printSchema * // root * // |-- name: string (nullable = false) * // |-- age: integer (nullable = true) * * dataFrame.registerTempTable("people") * sqlContext.sql("select name from people").collect.foreach(println) * }}} def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
8、创建DataFrame、外部表
def createExternalTable(tableName: String, path: String): DataFrame def createExternalTable( tableName: String, path: String, source: String): DataFrame def createExternalTable( tableName: String, source: String, options: Map[String, String]): DataFrame def createExternalTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame
9、创建DataFrame:创建一个只包含一个名为id的列,且在指定区间内的DataFrame
def range(start: Long, end: Long): DataFrame def range(end: Long): DataFrame def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame10、创建DataFrame:从sql查询语句中创建DataFrame
def sql(sqlText: String): DataFrame11、创建DataFrame:从表创建DataFrame
def table(tableName: String): DataFrame /** * Returns a [[DataFrame]] containing names of existing tables in the current database. * The returned DataFrame has two columns, tableName and isTemporary (a Boolean * indicating if a table is a temporary one or not). * * @group ddl_ops * @since 1.3.0 */ def tables(): DataFrame = { DataFrame(this, ShowTablesCommand(None)) } /** * Returns a [[DataFrame]] containing names of existing tables in the given database. * The returned DataFrame has two columns, tableName and isTemporary (a Boolean * indicating if a table is a temporary one or not). * * @group ddl_ops * @since 1.3.0 */ def tables(databaseName: String): DataFrame = { DataFrame(this, ShowTablesCommand(Some(databaseName))) }
二、SQLContext中隐式转换
1、StringToColumn:将StringContext格式隐式转换成StringToColumn类对象,从而具有$方法将列名转成Column类型
/** * Converts $"col name" into an [[Column]]. * @since 1.3.0 */ implicit class StringToColumn(val sc: StringContext) { def $(args: Any*): ColumnName = { new ColumnName(sc.s(args : _*)) } }2、scala Symbol类型的转换
/** * An implicit conversion that turns a Scala `Symbol` into a [[Column]]. * @since 1.3.0 */ implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)3、将RDD或者Seq类型(元素类型为case classes/tuples)转换为DataFrameHolder,DataFrameHolder具有方法
/** * Creates a DataFrame from an RDD of case classes or tuples. * @since 1.3.0 */ implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = { DataFrameHolder(self.createDataFrame(rdd)) } /** * Creates a DataFrame from a local Seq of Product. * @since 1.3.0 */ implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder = { DataFrameHolder(self.createDataFrame(data)) }4、将RDD(元素类型为Int/Long/String)转换为DataFrameHolder
implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder5、DataFrameHolder具有如下方法名,用于最终转换成DataFrame
private[sql] case class DataFrameHolder(df: DataFrame) { // This is declared with parentheses to prevent the Scala compiler from treating // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = df def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)//给列自定义列名 }
三、DataFrameReader
SQLContext类中的read方法返回一个DataFrameReader,它提供方法从外部获取DataFrame
1、格式化。通过format方法将数据源设置为source参数。默认数据源为parquet。
def format(source: String): DataFrameReader = { this.source = source this }2、定义schema信息
def schema(schema: StructType): DataFrameReader = { this.userSpecifiedSchema = Option(schema) this }3、附加参数
def option(key: String, value: String): DataFrameReader def options(options: scala.collection.Map[String, String]): DataFrameReader4、load方法。正是加载从外部文件系统的路径下的文件
def load(path: String): DataFrame = { option("path", path).load() } def load(): DataFrame5、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
def jdbc(url: String, table: String, properties: Properties): DataFrame def jdbc( url: String, table: String, columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int, connectionProperties: Properties): DataFrame def jdbc( url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame6、json
def json(path: String): DataFrame = format("json").load(path) def json(jsonRDD: RDD[String]): DataFrame7、parquet
def parquet(paths: String*): DataFrame8、从表加载创建DataFrame,同SQLContext中的同名方法
def table(tableName: String): DataFrame四、DataFrame
1、为schema注入列名,数量需一致
def toDF(colNames: String*): DataFrame2、获取schema信息
def schema: StructType3、获取列名及类型
/** * Returns all column names and their data types as an array. * @group basic * @since 1.3.0 */ def dtypes: Array[(String, String)] = schema.fields.map { field => (field.name, field.dataType.toString) }4、获取列名
def columns: Array[String]5、打印schema信息
def printSchema(): Unit6、打印数据
def show(numRows: Int): Unit def show(): Unit = show(20)7、join
def join(right: DataFrame): DataFrame def join(right: DataFrame, usingColumn: String): DataFrame def join(right: DataFrame, joinExprs: Column): DataFrame def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame8、排序sort/orderBy
/** * Returns a new [[DataFrame]] sorted by the specified column, all in ascending order. * {{{ * // The following 3 are equivalent * df.sort("sortcol") * df.sort($"sortcol") * df.sort($"sortcol".asc) * }}} * @group dfops * @since 1.3.0 */ @scala.annotation.varargs def sort(sortCol: String, sortCols: String*): DataFrame def sort(sortExprs: Column*): DataFrame def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols : _*) def orderBy(sortExprs: Column*): DataFrame9、用小括号将列名转化为Column类型
def apply(colName: String): Column = col(colName)10、col方法将列名转化为Column类型,同apply
def col(colName: String): Column11、别名
def as(alias: String): DataFrame def as(alias: Symbol): DataFrame12、选取部分列
def select(cols: Column*): DataFrame def select(col: String, cols: String*): DataFrame13、选取部分列,比select灵活,参数可以是sql表达方式
/** * Selects a set of SQL expressions. This is a variant of `select` that accepts * SQL expressions. * * {{{ * df.selectExpr("colA", "colB as newName", "abs(colC)") * }}} * @group dfops * @since 1.3.0 */ @scala.annotation.varargs def selectExpr(exprs: String*): DataFrame14、filter
/** * Filters rows using the given condition. * {{{ * // The following are equivalent: * peopleDf.filter($"age" > 15)//此处$"age"为Column类型,>符号被Column类重载,返回还是为Column类型 * peopleDf.where($"age" > 15) * }}} * @group dfops * @since 1.3.0 */ def filter(condition: Column): DataFrame /** * Filters rows using the given SQL expression. * {{{ * peopleDf.filter("age > 15")//参数为sql表达式 * }}} * @group dfops * @since 1.3.0 */ def filter(conditionExpr: String): DataFrame15、where
/** * Filters rows using the given condition. This is an alias for `filter`. * {{{ * // The following are equivalent: * peopleDf.filter($"age" > 15) * peopleDf.where($"age" > 15) * }}} * @group dfops * @since 1.3.0 */ def where(condition: Column): DataFrame16、groupBy
/** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. * See [[GroupedData]] for all the available aggregate functions. * * {{{ * // Compute the average for all numeric columns grouped by department. * df.groupBy($"department").avg() * * // Compute the max age and average salary, grouped by department and gender. * df.groupBy($"department", $"gender").agg(Map( * "salary" -> "avg", * "age" -> "max" * )) * }}} * @group dfops * @since 1.3.0 */ @scala.annotation.varargs def groupBy(cols: Column*): GroupedData
def groupBy(col1: String, cols: String*): GroupedData
17、获取前n行数据
def limit(n: Int): DataFrame18、unionAll
/** * Returns a new [[DataFrame]] containing union of rows in this frame and another frame. * This is equivalent to `UNION ALL` in SQL. * @group dfops * @since 1.3.0 */ def unionAll(other: DataFrame): DataFrame19、intersect
/** * Returns a new [[DataFrame]] containing rows only in both this frame and another frame. * This is equivalent to `INTERSECT` in SQL. * @group dfops * @since 1.3.0 */ def intersect(other: DataFrame): DataFrame20、前几行
def head(n: Int): Array[Row] def head(): Row override def first(): Row21、map 这里的rdd是RDD[Row],flatMap,mapPartitions,foreach,foreachPartition,take,collect,count,repartition,persist,cache同RDD方法
override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)22、distinct
/** * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. * This is an alias for `dropDuplicates`. * @group dfops * @since 1.3.0 */ override def distinct: DataFrame23、rdd
lazy val rdd: RDD[Row]24、将本DataFrame注册为临时表
/** * Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this * temporary table is tied to the [[SQLContext]] that was used to create this DataFrame. * * @group basic * @since 1.3.0 */ def registerTempTable(tableName: String): Unit25、write,向外部输出,下面介绍
def write: DataFrameWriter26、转化为json
def toJSON: RDD[String]五、DataFrameWriter
1、设置保存模式
/** * Specifies the behavior when data or table already exists. Options include: * - `SaveMode.Overwrite`: overwrite the existing data. * - `SaveMode.Append`: append the data. * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. * * @since 1.4.0 */ def mode(saveMode: SaveMode): DataFrameWriter = { this.mode = saveMode this }2、format
/** * Specifies the underlying output data source. Built-in options include "parquet", "json", etc. * * @since 1.4.0 */ def format(source: String): DataFrameWriter3、option
def option(key: String, value: String): DataFrameWriter
def options(options: scala.collection.Map[String, String]): DataFrameWriter4、partitionBy
def partitionBy(colNames: String*): DataFrameWriter5、save
def save(path: String): Unit def save(): Unit
6、insertInto
def insertInto(tableName: String): Unit
7、saveAsTable
def saveAsTable(tableName: String): Unit
8、jdbc
def jdbc(url: String, table: String, connectionProperties: Properties): Unit9、json
def json(path: String): Unit = format("json").save(path)10、parquet,也是save是默认的,可以不预先设置parquet
def parquet(path: String): Unit = format("parquet").save(path)
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- eclipse 开发 spark Streaming wordCount
- Spark中将对象序列化存储到hdfs
- Spark初探
- Spark Streaming初探
- 搭建hadoop/spark集群环境
- 整合Kafka到Spark Streaming——代码示例和挑战
- Spark 性能相关参数配置详解-任务调度篇
- 基于spark1.3.1的spark-sql实战-01
- 基于spark1.3.1的spark-sql实战-02
- 在 Databricks 可获得 Spark 1.5 预览版
- spark standalone模式 zeppelin安装
- Apache Spark 1.5.0正式发布
- Tachyon 0.7.1伪分布式集群安装与测试
- spark取得lzo压缩文件报错 java.lang.ClassNotFoundException
- tachyon与hdfs,以及spark整合
- sparksql与hive整合