SparkSQL 相关API
2016-01-29 14:19
465 查看
一、SQLContext.scala中的接口说明
大部分接口都是创建DataFrame
1、构造:SQLContext的构造只需要一个SparkContext参数
2、设置/获取 配置:setConf/getConf
3、isCached/cacheTable/uncacheTable/clearCache:数据缓存相关,提高查询速度,需谨慎防止OOM
4、read:用于从外部数据源读取 //todo,下文有详细介绍
[java] view
plain copy
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
def read: DataFrameReader = new DataFrameReader(this)
5、删除临时表
[java] view
plain copy
def dropTempTable(tableName: String): Unit
6、查看目前有哪些表(表的产生在下文DataFrame的方法中有介绍)
[java] view
plain copy
def tableNames(): Array[String]
def tableNames(databaseName: String): Array[String]
7、创建DataFrame:可通过现成的RDD或者Seq来创建
[java] view
plain copy
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
[java] view
plain copy
* Example:
* {{{
* import org.apache.spark.sql._
* import org.apache.spark.sql.types._
* val sqlContext = new org.apache.spark.sql.SQLContext(sc)
*
* val schema =
* StructType(
* StructField("name", StringType, false) ::
* StructField("age", IntegerType, true) :: Nil)
*
* val people =
* sc.textFile("examples/src/main/resources/people.txt").map(
* _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
* val dataFrame = sqlContext.createDataFrame(people, schema)
* dataFrame.printSchema
* // root
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
8、创建DataFrame、外部表
[java] view
plain copy
def createExternalTable(tableName: String, path: String): DataFrame
def createExternalTable(
tableName: String,
path: String,
source: String): DataFrame
def createExternalTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame
9、创建DataFrame:创建一个只包含一个名为id的列,且在指定区间内的DataFrame
[java] view
plain copy
def range(start: Long, end: Long): DataFrame
def range(end: Long): DataFrame
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame
10、创建DataFrame:从sql查询语句中创建DataFrame
[java] view
plain copy
def sql(sqlText: String): DataFrame
11、创建DataFrame:从表创建DataFrame
[java] view
plain copy
def table(tableName: String): DataFrame
/**
* Returns a [[DataFrame]] containing names of existing tables in the current database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
}
/**
* Returns a [[DataFrame]] containing names of existing tables in the given database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
}
二、SQLContext中隐式转换
1、StringToColumn:将StringContext格式隐式转换成StringToColumn类对象,从而具有$方法将列名转成Column类型
[java] view
plain copy
/**
* Converts $"col name" into an [[Column]].
* @since 1.3.0
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args : _*))
}
}
2、scala Symbol类型的转换
[java] view
plain copy
/**
* An implicit conversion that turns a Scala `Symbol` into a [[Column]].
* @since 1.3.0
*/
implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
3、将RDD或者Seq类型(元素类型为case classes/tuples)转换为DataFrameHolder,DataFrameHolder具有方法
[java] view
plain copy
/**
* Creates a DataFrame from an RDD of case classes or tuples.
* @since 1.3.0
*/
implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
DataFrameHolder(self.createDataFrame(rdd))
}
/**
* Creates a DataFrame from a local Seq of Product.
* @since 1.3.0
*/
implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder =
{
DataFrameHolder(self.createDataFrame(data))
}
4、将RDD(元素类型为Int/Long/String)转换为DataFrameHolder
[java] view
plain copy
implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder
implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder
implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder
5、DataFrameHolder具有如下方法名,用于最终转换成DataFrame
[java] view
plain copy
private[sql] case class DataFrameHolder(df: DataFrame) {
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = df
def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)//给列自定义列名
}
三、DataFrameReader
SQLContext类中的read方法返回一个DataFrameReader,它提供方法从外部获取DataFrame
1、格式化。通过format方法将数据源设置为source参数。默认数据源为parquet。
[java] view
plain copy
def format(source: String): DataFrameReader = {
this.source = source
this
}
2、定义schema信息
[java] view
plain copy
def schema(schema: StructType): DataFrameReader = {
this.userSpecifiedSchema = Option(schema)
this
}
3、附加参数
[java] view
plain copy
def option(key: String, value: String): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
4、load方法。正是加载从外部文件系统的路径下的文件
[java] view
plain copy
def load(path: String): DataFrame = {
option("path", path).load()
}
def load(): DataFrame
5、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
[java] view
plain copy
def jdbc(url: String, table: String, properties: Properties): DataFrame
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
6、json
[java] view
plain copy
def json(path: String): DataFrame = format("json").load(path)
def json(jsonRDD: RDD[String]): DataFrame
7、parquet
[java] view
plain copy
def parquet(paths: String*): DataFrame
8、从表加载创建DataFrame,同SQLContext中的同名方法
[java] view
plain copy
def table(tableName: String): DataFrame
四、DataFrame
1、为schema注入列名,数量需一致
[java] view
plain copy
def toDF(colNames: String*): DataFrame
2、获取schema信息
[java] view
plain copy
def schema: StructType
3、获取列名及类型
[java] view
plain copy
/**
* Returns all column names and their data types as an array.
* @group basic
* @since 1.3.0
*/
def dtypes: Array[(String, String)] = schema.fields.map { field =>
(field.name, field.dataType.toString)
}
4、获取列名
[java] view
plain copy
def columns: Array[String]
5、打印schema信息
[java] view
plain copy
def printSchema(): Unit
6、打印数据
[java] view
plain copy
def show(numRows: Int): Unit
def show(): Unit = show(20)
7、join
[java] view
plain copy
def join(right: DataFrame): DataFrame
def join(right: DataFrame, usingColumn: String): DataFrame
def join(right: DataFrame, joinExprs: Column): DataFrame
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
8、排序sort/orderBy
[java] view
plain copy
/**
* Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
* {{{
* // The following 3 are equivalent
* df.sort("sortcol")
* df.sort($"sortcol")
* df.sort($"sortcol".asc)
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): DataFrame
def sort(sortExprs: Column*): DataFrame
def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols : _*)
def orderBy(sortExprs: Column*): DataFrame
9、用小括号将列名转化为Column类型
[java] view
plain copy
def apply(colName: String): Column = col(colName)
10、col方法将列名转化为Column类型,同apply
[java] view
plain copy
def col(colName: String): Column
11、别名
[java] view
plain copy
def as(alias: String): DataFrame
def as(alias: Symbol): DataFrame
12、选取部分列
[java] view
plain copy
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
13、选取部分列,比select灵活,参数可以是sql表达方式
[java] view
plain copy
/**
* Selects a set of SQL expressions. This is a variant of `select` that accepts
* SQL expressions.
*
* {{{
* df.selectExpr("colA", "colB as newName", "abs(colC)")
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame
14、filter
[java] view
plain copy
/**
* Filters rows using the given condition.
* {{{
* // The following are equivalent:
* peopleDf.filter($"age" > 15)//此处$"age"为Column类型,>符号被Column类重载,返回还是为Column类型
* peopleDf.where($"age" > 15)
* }}}
* @group dfops
* @since 1.3.0
*/
def filter(condition: Column): DataFrame
/**
* Filters rows using the given SQL expression.
* {{{
* peopleDf.filter("age > 15")//参数为sql表达式
* }}}
* @group dfops
* @since 1.3.0
*/
def filter(conditionExpr: String): DataFrame
15、where
[java] view
plain copy
/**
* Filters rows using the given condition. This is an alias for `filter`.
* {{{
* // The following are equivalent:
* peopleDf.filter($"age" > 15)
* peopleDf.where($"age" > 15)
* }}}
* @group dfops
* @since 1.3.0
*/
def where(condition: Column): DataFrame
16、groupBy
[java] view
plain copy
/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns grouped by department.
* df.groupBy($"department").avg()
*
* // Compute the max age and average salary, grouped by department and gender.
* df.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData
def groupBy(col1: String, cols: String*): GroupedData
17、获取前n行数据
[java] view
plain copy
def limit(n: Int): DataFrame
18、unionAll
[java] view
plain copy
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
* @group dfops
* @since 1.3.0
*/
def unionAll(other: DataFrame): DataFrame
19、intersect
[java] view
plain copy
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
* @group dfops
* @since 1.3.0
*/
def intersect(other: DataFrame): DataFrame
20、前几行
[java] view
plain copy
def head(n: Int): Array[Row]
def head(): Row
override def first(): Row
21、map 这里的rdd是RDD[Row],flatMap,mapPartitions,foreach,foreachPartition,take,collect,count,repartition,persist,cache同RDD方法
[java] view
plain copy
override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
22、distinct
[java] view
plain copy
/**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* This is an alias for `dropDuplicates`.
* @group dfops
* @since 1.3.0
*/
override def distinct: DataFrame
23、rdd
[java] view
plain copy
lazy val rdd: RDD[Row]
24、将本DataFrame注册为临时表
[java] view
plain copy
/**
* Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this
* temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
*
* @group basic
* @since 1.3.0
*/
def registerTempTable(tableName: String): Unit
25、write,向外部输出,下面介绍
[java] view
plain copy
def write: DataFrameWriter
26、转化为json
[java] view
plain copy
def toJSON: RDD[String]
五、DataFrameWriter
1、设置保存模式
[java] view
plain copy
/**
* Specifies the behavior when data or table already exists. Options include:
* - `SaveMode.Overwrite`: overwrite the existing data.
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
2、format
[java] view
plain copy
/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 1.4.0
*/
def format(source: String): DataFrameWriter
3、option
[java] view
plain copy
def option(key: String, value: String): DataFrameWriter
[java] view
plain copy
def options(options: scala.collection.Map[String, String]): DataFrameWriter
4、partitionBy
[java] view
plain copy
def partitionBy(colNames: String*): DataFrameWriter
5、save
[java] view
plain copy
def save(path: String): Unit
def save(): Unit
6、insertInto
[java] view
plain copy
def insertInto(tableName: String): Unit
7、saveAsTable
[java] view
plain copy
def saveAsTable(tableName: String): Unit
8、jdbc
[java] view
plain copy
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
9、json
[java] view
plain copy
def json(path: String): Unit = format("json").save(path)
10、parquet,也是save是默认的,可以不预先设置parquet
[java] view
plain copy
def parquet(path: String): Unit = format("parquet").save(path)
转载: http://blog.csdn.net/yueqian_zhu/article/details/49587433
大部分接口都是创建DataFrame
1、构造:SQLContext的构造只需要一个SparkContext参数
2、设置/获取 配置:setConf/getConf
3、isCached/cacheTable/uncacheTable/clearCache:数据缓存相关,提高查询速度,需谨慎防止OOM
4、read:用于从外部数据源读取 //todo,下文有详细介绍
[java] view
plain copy
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
def read: DataFrameReader = new DataFrameReader(this)
5、删除临时表
[java] view
plain copy
def dropTempTable(tableName: String): Unit
6、查看目前有哪些表(表的产生在下文DataFrame的方法中有介绍)
[java] view
plain copy
def tableNames(): Array[String]
def tableNames(databaseName: String): Array[String]
7、创建DataFrame:可通过现成的RDD或者Seq来创建
[java] view
plain copy
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame
[java] view
plain copy
* Example:
* {{{
* import org.apache.spark.sql._
* import org.apache.spark.sql.types._
* val sqlContext = new org.apache.spark.sql.SQLContext(sc)
*
* val schema =
* StructType(
* StructField("name", StringType, false) ::
* StructField("age", IntegerType, true) :: Nil)
*
* val people =
* sc.textFile("examples/src/main/resources/people.txt").map(
* _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
* val dataFrame = sqlContext.createDataFrame(people, schema)
* dataFrame.printSchema
* // root
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
8、创建DataFrame、外部表
[java] view
plain copy
def createExternalTable(tableName: String, path: String): DataFrame
def createExternalTable(
tableName: String,
path: String,
source: String): DataFrame
def createExternalTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame
9、创建DataFrame:创建一个只包含一个名为id的列,且在指定区间内的DataFrame
[java] view
plain copy
def range(start: Long, end: Long): DataFrame
def range(end: Long): DataFrame
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame
10、创建DataFrame:从sql查询语句中创建DataFrame
[java] view
plain copy
def sql(sqlText: String): DataFrame
11、创建DataFrame:从表创建DataFrame
[java] view
plain copy
def table(tableName: String): DataFrame
/**
* Returns a [[DataFrame]] containing names of existing tables in the current database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
}
/**
* Returns a [[DataFrame]] containing names of existing tables in the given database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
* @since 1.3.0
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
}
二、SQLContext中隐式转换
1、StringToColumn:将StringContext格式隐式转换成StringToColumn类对象,从而具有$方法将列名转成Column类型
[java] view
plain copy
/**
* Converts $"col name" into an [[Column]].
* @since 1.3.0
*/
implicit class StringToColumn(val sc: StringContext) {
def $(args: Any*): ColumnName = {
new ColumnName(sc.s(args : _*))
}
}
2、scala Symbol类型的转换
[java] view
plain copy
/**
* An implicit conversion that turns a Scala `Symbol` into a [[Column]].
* @since 1.3.0
*/
implicit def symbolToColumn(s: Symbol): ColumnName = new ColumnName(s.name)
3、将RDD或者Seq类型(元素类型为case classes/tuples)转换为DataFrameHolder,DataFrameHolder具有方法
[java] view
plain copy
/**
* Creates a DataFrame from an RDD of case classes or tuples.
* @since 1.3.0
*/
implicit def rddToDataFrameHolder[A <: Product : TypeTag](rdd: RDD[A]): DataFrameHolder = {
DataFrameHolder(self.createDataFrame(rdd))
}
/**
* Creates a DataFrame from a local Seq of Product.
* @since 1.3.0
*/
implicit def localSeqToDataFrameHolder[A <: Product : TypeTag](data: Seq[A]): DataFrameHolder =
{
DataFrameHolder(self.createDataFrame(data))
}
4、将RDD(元素类型为Int/Long/String)转换为DataFrameHolder
[java] view
plain copy
implicit def intRddToDataFrameHolder(data: RDD[Int]): DataFrameHolder
implicit def longRddToDataFrameHolder(data: RDD[Long]): DataFrameHolder
implicit def stringRddToDataFrameHolder(data: RDD[String]): DataFrameHolder
5、DataFrameHolder具有如下方法名,用于最终转换成DataFrame
[java] view
plain copy
private[sql] case class DataFrameHolder(df: DataFrame) {
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = df
def toDF(colNames: String*): DataFrame = df.toDF(colNames : _*)//给列自定义列名
}
三、DataFrameReader
SQLContext类中的read方法返回一个DataFrameReader,它提供方法从外部获取DataFrame
1、格式化。通过format方法将数据源设置为source参数。默认数据源为parquet。
[java] view
plain copy
def format(source: String): DataFrameReader = {
this.source = source
this
}
2、定义schema信息
[java] view
plain copy
def schema(schema: StructType): DataFrameReader = {
this.userSpecifiedSchema = Option(schema)
this
}
3、附加参数
[java] view
plain copy
def option(key: String, value: String): DataFrameReader
def options(options: scala.collection.Map[String, String]): DataFrameReader
4、load方法。正是加载从外部文件系统的路径下的文件
[java] view
plain copy
def load(path: String): DataFrame = {
option("path", path).load()
}
def load(): DataFrame
5、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。
[java] view
plain copy
def jdbc(url: String, table: String, properties: Properties): DataFrame
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
6、json
[java] view
plain copy
def json(path: String): DataFrame = format("json").load(path)
def json(jsonRDD: RDD[String]): DataFrame
7、parquet
[java] view
plain copy
def parquet(paths: String*): DataFrame
8、从表加载创建DataFrame,同SQLContext中的同名方法
[java] view
plain copy
def table(tableName: String): DataFrame
四、DataFrame
1、为schema注入列名,数量需一致
[java] view
plain copy
def toDF(colNames: String*): DataFrame
2、获取schema信息
[java] view
plain copy
def schema: StructType
3、获取列名及类型
[java] view
plain copy
/**
* Returns all column names and their data types as an array.
* @group basic
* @since 1.3.0
*/
def dtypes: Array[(String, String)] = schema.fields.map { field =>
(field.name, field.dataType.toString)
}
4、获取列名
[java] view
plain copy
def columns: Array[String]
5、打印schema信息
[java] view
plain copy
def printSchema(): Unit
6、打印数据
[java] view
plain copy
def show(numRows: Int): Unit
def show(): Unit = show(20)
7、join
[java] view
plain copy
def join(right: DataFrame): DataFrame
def join(right: DataFrame, usingColumn: String): DataFrame
def join(right: DataFrame, joinExprs: Column): DataFrame
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
8、排序sort/orderBy
[java] view
plain copy
/**
* Returns a new [[DataFrame]] sorted by the specified column, all in ascending order.
* {{{
* // The following 3 are equivalent
* df.sort("sortcol")
* df.sort($"sortcol")
* df.sort($"sortcol".asc)
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): DataFrame
def sort(sortExprs: Column*): DataFrame
def orderBy(sortCol: String, sortCols: String*): DataFrame = sort(sortCol, sortCols : _*)
def orderBy(sortExprs: Column*): DataFrame
9、用小括号将列名转化为Column类型
[java] view
plain copy
def apply(colName: String): Column = col(colName)
10、col方法将列名转化为Column类型,同apply
[java] view
plain copy
def col(colName: String): Column
11、别名
[java] view
plain copy
def as(alias: String): DataFrame
def as(alias: Symbol): DataFrame
12、选取部分列
[java] view
plain copy
def select(cols: Column*): DataFrame
def select(col: String, cols: String*): DataFrame
13、选取部分列,比select灵活,参数可以是sql表达方式
[java] view
plain copy
/**
* Selects a set of SQL expressions. This is a variant of `select` that accepts
* SQL expressions.
*
* {{{
* df.selectExpr("colA", "colB as newName", "abs(colC)")
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame
14、filter
[java] view
plain copy
/**
* Filters rows using the given condition.
* {{{
* // The following are equivalent:
* peopleDf.filter($"age" > 15)//此处$"age"为Column类型,>符号被Column类重载,返回还是为Column类型
* peopleDf.where($"age" > 15)
* }}}
* @group dfops
* @since 1.3.0
*/
def filter(condition: Column): DataFrame
/**
* Filters rows using the given SQL expression.
* {{{
* peopleDf.filter("age > 15")//参数为sql表达式
* }}}
* @group dfops
* @since 1.3.0
*/
def filter(conditionExpr: String): DataFrame
15、where
[java] view
plain copy
/**
* Filters rows using the given condition. This is an alias for `filter`.
* {{{
* // The following are equivalent:
* peopleDf.filter($"age" > 15)
* peopleDf.where($"age" > 15)
* }}}
* @group dfops
* @since 1.3.0
*/
def where(condition: Column): DataFrame
16、groupBy
[java] view
plain copy
/**
* Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them.
* See [[GroupedData]] for all the available aggregate functions.
*
* {{{
* // Compute the average for all numeric columns grouped by department.
* df.groupBy($"department").avg()
*
* // Compute the max age and average salary, grouped by department and gender.
* df.groupBy($"department", $"gender").agg(Map(
* "salary" -> "avg",
* "age" -> "max"
* ))
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData
def groupBy(col1: String, cols: String*): GroupedData
17、获取前n行数据
[java] view
plain copy
def limit(n: Int): DataFrame
18、unionAll
[java] view
plain copy
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
* @group dfops
* @since 1.3.0
*/
def unionAll(other: DataFrame): DataFrame
19、intersect
[java] view
plain copy
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
* @group dfops
* @since 1.3.0
*/
def intersect(other: DataFrame): DataFrame
20、前几行
[java] view
plain copy
def head(n: Int): Array[Row]
def head(): Row
override def first(): Row
21、map 这里的rdd是RDD[Row],flatMap,mapPartitions,foreach,foreachPartition,take,collect,count,repartition,persist,cache同RDD方法
[java] view
plain copy
override def map[R: ClassTag](f: Row => R): RDD[R] = rdd.map(f)
22、distinct
[java] view
plain copy
/**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* This is an alias for `dropDuplicates`.
* @group dfops
* @since 1.3.0
*/
override def distinct: DataFrame
23、rdd
[java] view
plain copy
lazy val rdd: RDD[Row]
24、将本DataFrame注册为临时表
[java] view
plain copy
/**
* Registers this [[DataFrame]] as a temporary table using the given name. The lifetime of this
* temporary table is tied to the [[SQLContext]] that was used to create this DataFrame.
*
* @group basic
* @since 1.3.0
*/
def registerTempTable(tableName: String): Unit
25、write,向外部输出,下面介绍
[java] view
plain copy
def write: DataFrameWriter
26、转化为json
[java] view
plain copy
def toJSON: RDD[String]
五、DataFrameWriter
1、设置保存模式
[java] view
plain copy
/**
* Specifies the behavior when data or table already exists. Options include:
* - `SaveMode.Overwrite`: overwrite the existing data.
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
2、format
[java] view
plain copy
/**
* Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
*
* @since 1.4.0
*/
def format(source: String): DataFrameWriter
3、option
[java] view
plain copy
def option(key: String, value: String): DataFrameWriter
[java] view
plain copy
def options(options: scala.collection.Map[String, String]): DataFrameWriter
4、partitionBy
[java] view
plain copy
def partitionBy(colNames: String*): DataFrameWriter
5、save
[java] view
plain copy
def save(path: String): Unit
def save(): Unit
6、insertInto
[java] view
plain copy
def insertInto(tableName: String): Unit
7、saveAsTable
[java] view
plain copy
def saveAsTable(tableName: String): Unit
8、jdbc
[java] view
plain copy
def jdbc(url: String, table: String, connectionProperties: Properties): Unit
9、json
[java] view
plain copy
def json(path: String): Unit = format("json").save(path)
10、parquet,也是save是默认的,可以不预先设置parquet
[java] view
plain copy
def parquet(path: String): Unit = format("parquet").save(path)
转载: http://blog.csdn.net/yueqian_zhu/article/details/49587433
相关文章推荐
- redis link链表结构
- mongodb学习2(高级部分)
- sql server2005出现异常及解决方法
- SparkSQL 概述
- sql操作语句
- PLSQL导入导出ORACLE数据库
- redis字符串类型的操作
- 数据库映射
- java实现类似与redis的hash存储操作
- 总是忘记的场景:MySQL InnoDB四个事务级别 与 脏读、不重复读、幻读
- Sql优化-必劳记!
- 分组后取每组内排名的Top N的SQL语句
- RAC-DG 安装总结
- 在Sql Server 2016中使用For Json子句把数据作为json格式导出
- nginx 连接 mongodb
- SQL语句中SELECT语句的执行顺序
- 在Sql Server 2016中使用For Json子句把数据作为json格式导出
- MySQL日期数据类型和时间类型使用总结
- windows下安装mongodb
- sas与mysql连接方法