您的位置:首页 > 数据库

SparkSQL 相关API

2016-01-29 14:19 465 查看
一、SQLContext.scala中的接口说明

大部分接口都是创建DataFrame

1、构造:SQLContext的构造只需要一个SparkContext参数

2、设置/获取 配置:setConf/getConf

3、isCached/cacheTable/uncacheTable/clearCache:数据缓存相关,提高查询速度,需谨慎防止OOM

4、read:用于从外部数据源读取 //todo,下文有详细介绍

[java] view
plain copy

* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].

* {{{

* sqlContext.read.parquet("/path/to/file.parquet")

* sqlContext.read.schema(schema).json("/path/to/file.json")

* }}}

def read: DataFrameReader = new DataFrameReader(this)

5、删除临时表

[java] view
plain copy

def dropTempTable(tableName: String): Unit

6、查看目前有哪些表(表的产生在下文DataFrame的方法中有介绍)

[java] view
plain copy

def tableNames(): Array[String]

def tableNames(databaseName: String): Array[String]

7、创建DataFrame:可通过现成的RDD或者Seq来创建

[java] view
plain copy

def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame

def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame

[java] view
plain copy

* Example:

* {{{

* import org.apache.spark.sql._

* import org.apache.spark.sql.types._

* val sqlContext = new org.apache.spark.sql.SQLContext(sc)

*

* val schema =

* StructType(

* StructField("name", StringType, false) ::

* StructField("age", IntegerType, true) :: Nil)

*

* val people =

* sc.textFile("examples/src/main/resources/people.txt").map(

* _.split(",")).map(p => Row(p(0), p(1).trim.toInt))

* val dataFrame = sqlContext.createDataFrame(people, schema)

* dataFrame.printSchema

* // root

* // |-- name: string (nullable = false)

* // |-- age: integer (nullable = true)

*

* dataFrame.registerTempTable("people")

* sqlContext.sql("select name from people").collect.foreach(println)

* }}}

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame



8、创建DataFrame、外部表

[java] view
plain copy

def createExternalTable(tableName: String, path: String): DataFrame

def createExternalTable(

tableName: String,

path: String,

source: String): DataFrame

def createExternalTable(

tableName: String,

source: String,

options: Map[String, String]): DataFrame

def createExternalTable(

tableName: String,

source: String,

schema: StructType,

options: Map[String, String]): DataFrame

9、创建DataFrame:创建一个只包含一个名为id的列,且在指定区间内的DataFrame

[java] view
plain copy

def range(start: Long, end: Long): DataFrame

def range(end: Long): DataFrame

def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame

10、创建DataFrame:从sql查询语句中创建DataFrame

[java] view
plain copy

def sql(sqlText: String): DataFrame

11、创建DataFrame:从表创建DataFrame

[java] view
plain copy

def table(tableName: String): DataFrame

/**

* Returns a [[DataFrame]] containing names of existing tables in the current database.

* The returned DataFrame has two columns, tableName and isTemporary (a Boolean

* indicating if a table is a temporary one or not).

*

* @group ddl_ops

* @since 1.3.0

*/

def tables(): DataFrame = {

DataFrame(this, ShowTablesCommand(None))

}

/**

* Returns a [[DataFrame]] containing names of existing tables in the given database.

* The returned DataFrame has two columns, tableName and isTemporary (a Boolean

* indicating if a table is a temporary one or not).

*

* @group ddl_ops

* @since 1.3.0

*/

def tables(databaseName: String): DataFrame = {

DataFrame(this, ShowTablesCommand(Some(databaseName)))

}

二、SQLContext中隐式转换

1、StringToColumn:将StringContext格式隐式转换成StringToColumn类对象,从而具有$方法将列名转成Column类型

[java] view
plain copy

/**

* Converts $"col name" into an [[Column]].

* @since 1.3.0

*/

implicit class StringToColumn(val sc: StringContext) {

def $(args: Any*): ColumnName = {

new ColumnName(sc.s(args : _*))

}

}

2、scala Symbol类型的转换

[java] view
plain copy

/**

* An implicit conversion that turns a Scala `Symbol` into a [[Column]].

* @since 1.3.0

*/

implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)

3、将RDD或者Seq类型(元素类型为case classes/tuples)转换为DataFrameHolder,DataFrameHolder具有方法

[java] view
plain copy

/**

* Creates a DataFrame from an RDD of case classes or tuples.

* @since 1.3.0

*/

implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {

DataFrameHolder(self.createDataFrame(rdd))

}

/**

* Creates a DataFrame from a local Seq of Product.

* @since 1.3.0

*/

implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder =

{

DataFrameHolder(self.createDataFrame(data))

}

4、将RDD(元素类型为Int/Long/String)转换为DataFrameHolder

[java] view
plain copy

implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder

implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder

implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder

5、DataFrameHolder具有如下方法名,用于最终转换成DataFrame

[java] view
plain copy

private[sql] case class DataFrameHolder(df: DataFrame) {

// This is declared with parentheses to prevent the Scala compiler from treating

// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.

def toDF(): DataFrame = df

def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)//给列自定义列名

}

三、DataFrameReader

SQLContext类中的read方法返回一个DataFrameReader,它提供方法从外部获取DataFrame

1、格式化。通过format方法将数据源设置为source参数。默认数据源为parquet。

[java] view
plain copy

def format(source: String): DataFrameReader = {

this.source = source

this

}

2、定义schema信息

[java] view
plain copy

def schema(schema: StructType): DataFrameReader = {

this.userSpecifiedSchema = Option(schema)

this

}

3、附加参数

[java] view
plain copy

def option(key: String, value: String): DataFrameReader

def options(options: scala.collection.Map[String, String]): DataFrameReader

4、load方法。正是加载从外部文件系统的路径下的文件

[java] view
plain copy

def load(path: String): DataFrame = {

option("path", path).load()

}

def load(): DataFrame

5、jdbc

需首先加载相应的jdbc驱动到spark classpath

每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。

[java] view
plain copy

def jdbc(url: String, table: String, properties: Properties): DataFrame

def jdbc(

url: String,

table: String,

columnName: String,

lowerBound: Long,

upperBound: Long,

numPartitions: Int,

connectionProperties: Properties): DataFrame

def jdbc(

url: String,

table: String,

predicates: Array[String],

connectionProperties: Properties): DataFrame

6、json

[java] view
plain copy

def json(path: String): DataFrame = format("json").load(path)

def json(jsonRDD: RDD[String]): DataFrame

7、parquet

[java] view
plain copy

def parquet(paths: String*): DataFrame

8、从表加载创建DataFrame,同SQLContext中的同名方法

[java] view
plain copy

def table(tableName: String): DataFrame

四、DataFrame

1、为schema注入列名,数量需一致

[java] view
plain copy

def toDF(colNames: String*): DataFrame

2、获取schema信息

[java] view
plain copy

def schema: StructType

3、获取列名及类型

[java] view
plain copy

/**

* Returns all column names and their data types as an array.

* @group basic

* @since 1.3.0

*/

def dtypes: Array[(String, String)] = schema.fields.map { field =>

(field.name, field.dataType.toString)

}

4、获取列名

[java] view
plain copy

def columns: Array[String]

5、打印schema信息

[java] view
plain copy

def printSchema(): Unit

6、打印数据

[java] view
plain copy

def show(numRows: Int): Unit

def show(): Unit = show(20)

7、join

[java] view
plain copy

def join(right: DataFrame): DataFrame

def join(right: DataFrame, usingColumn: String): DataFrame

def join(right: DataFrame, joinExprs: Column): DataFrame

def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame

8、排序sort/orderBy

[java] view
plain copy

/**

* Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.

* {{{

* // The following 3 are equivalent

* df.sort("sortcol")

* df.sort($"sortcol")

* df.sort($"sortcol".asc)

* }}}

* @group dfops

* @since 1.3.0

*/

@scala.annotation.varargs

def sort(sortCol: String, sortCols: String*): DataFrame

def sort(sortExprs: Column*): DataFrame

def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols : _*)

def orderBy(sortExprs: Column*): DataFrame

9、用小括号将列名转化为Column类型

[java] view
plain copy

def apply(colName: String): Column = col(colName)

10、col方法将列名转化为Column类型,同apply

[java] view
plain copy

def col(colName: String): Column

11、别名

[java] view
plain copy

def as(alias: String): DataFrame

def as(alias: Symbol): DataFrame

12、选取部分列

[java] view
plain copy

def select(cols: Column*): DataFrame

def select(col: String, cols: String*): DataFrame

13、选取部分列,比select灵活,参数可以是sql表达方式

[java] view
plain copy

/**

* Selects a set of SQL expressions. This is a variant of `select` that accepts

* SQL expressions.

*

* {{{

* df.selectExpr("colA", "colB as newName", "abs(colC)")

* }}}

* @group dfops

* @since 1.3.0

*/

@scala.annotation.varargs

def selectExpr(exprs: String*): DataFrame

14、filter

[java] view
plain copy

/**

* Filters rows using the given condition.

* {{{

* // The following are equivalent:

* peopleDf.filter($"age" > 15)//此处$"age"为Column类型,>符号被Column类重载,返回还是为Column类型

* peopleDf.where($"age" > 15)

* }}}

* @group dfops

* @since 1.3.0

*/

def filter(condition: Column): DataFrame

/**

* Filters rows using the given SQL expression.

* {{{

* peopleDf.filter("age > 15")//参数为sql表达式

* }}}

* @group dfops

* @since 1.3.0

*/

def filter(conditionExpr: String): DataFrame

15、where

[java] view
plain copy

/**

* Filters rows using the given condition. This is an alias for `filter`.

* {{{

* // The following are equivalent:

* peopleDf.filter($"age" > 15)

* peopleDf.where($"age" > 15)

* }}}

* @group dfops

* @since 1.3.0

*/

def where(condition: Column): DataFrame

16、groupBy

[java] view
plain copy

/**

* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.

* See [[GroupedData]] for all the available aggregate functions.

*

* {{{

* // Compute the average for all numeric columns grouped by department.

* df.groupBy($"department").avg()

*

* // Compute the max age and average salary, grouped by department and gender.

* df.groupBy($"department", $"gender").agg(Map(

* "salary" -> "avg",

* "age" -> "max"

* ))

* }}}

* @group dfops

* @since 1.3.0

*/

@scala.annotation.varargs

def groupBy(cols: Column*): GroupedData

def groupBy(col1: String, cols: String*): GroupedData

17、获取前n行数据

[java] view
plain copy

def limit(n: Int): DataFrame

18、unionAll

[java] view
plain copy

/**

* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.

* This is equivalent to `UNION ALL` in SQL.

* @group dfops

* @since 1.3.0

*/

def unionAll(other: DataFrame): DataFrame

19、intersect

[java] view
plain copy

/**

* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.

* This is equivalent to `INTERSECT` in SQL.

* @group dfops

* @since 1.3.0

*/

def intersect(other: DataFrame): DataFrame

20、前几行

[java] view
plain copy

def head(n: Int): Array[Row]

def head(): Row

override def first(): Row

21、map 这里的rdd是RDD[Row],flatMap,mapPartitions,foreach,foreachPartition,take,collect,count,repartition,persist,cache同RDD方法

[java] view
plain copy

override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)

22、distinct

[java] view
plain copy

/**

* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].

* This is an alias for `dropDuplicates`.

* @group dfops

* @since 1.3.0

*/

override def distinct: DataFrame

23、rdd

[java] view
plain copy

lazy val rdd: RDD[Row]

24、将本DataFrame注册为临时表

[java] view
plain copy

/**

* Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this

* temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.

*

* @group basic

* @since 1.3.0

*/

def registerTempTable(tableName: String): Unit

25、write,向外部输出,下面介绍

[java] view
plain copy

def write: DataFrameWriter

26、转化为json

[java] view
plain copy

def toJSON: RDD[String]

五、DataFrameWriter

1、设置保存模式

[java] view
plain copy

/**

* Specifies the behavior when data or table already exists. Options include:

* - `SaveMode.Overwrite`: overwrite the existing data.

* - `SaveMode.Append`: append the data.

* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).

* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.

*

* @since 1.4.0

*/

def mode(saveMode: SaveMode): DataFrameWriter = {

this.mode = saveMode

this

}

2、format

[java] view
plain copy

/**

* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.

*

* @since 1.4.0

*/

def format(source: String): DataFrameWriter

3、option

[java] view
plain copy

def option(key: String, value: String): DataFrameWriter

[java] view
plain copy

def options(options: scala.collection.Map[String, String]): DataFrameWriter

4、partitionBy

[java] view
plain copy

def partitionBy(colNames: String*): DataFrameWriter

5、save

[java] view
plain copy

def save(path: String): Unit

def save(): Unit

6、insertInto

[java] view
plain copy

def insertInto(tableName: String): Unit

7、saveAsTable

[java] view
plain copy

def saveAsTable(tableName: String): Unit

8、jdbc

[java] view
plain copy

def jdbc(url: String, table: String, connectionProperties: Properties): Unit

9、json

[java] view
plain copy

def json(path: String): Unit = format("json").save(path)

10、parquet,也是save是默认的,可以不预先设置parquet

[java] view
plain copy

def parquet(path: String): Unit = format("parquet").save(path)

转载: http://blog.csdn.net/yueqian_zhu/article/details/49587433
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: