您的位置:首页 > 数据库

spark sql 基础

2016-04-21 16:54 501 查看

SparkSQL初始化

Java API

SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");	JavaSparkContext ctx = new JavaSparkContext(sparkConf);	  SQLContext sqlContext = new SQLContext(ctx);

启动Spark SQL CLI

1、将hive的配置文件hive-site.xml放到$spark_home的conf/下。
2、在spark的spark-env.sh中加入如下配置
export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.0/etc/hadoop

3、启动命令:./bin/spark-sql启动。
Spark-shell on yarn
命令:./bin/spark-shell --master yarn-client --executor-memory 1g --num-executors 10

注意:这里的–master必须使用yarn-client模式,如果指定yarn-cluster,则会报错:
Error: Cluster deploy mode is not applicable to Spark shells.

因为spark-shell作为一个与用户交互的命令行,必须将Driver运行在本地,而不是yarn上。
其中的参数与提交Spark应用程序到yarn上用法一样。
在RM管理界面可以看到spark-sql作为一个长服务的任务运行这yarn上。
Spark-sql on yarn
运行这个命令之前,需要把hive-site.xml放入$spark_home/conf下,把mysql-connector-java-5.1.15-bin.jar放入$spark_home/lib下。
命令:./bin/spark-sql --master yarn-client --executor-memory 1g --num-executors 10

Spark-submit whith hive提交作业
./bin/spark-submit --class testHive.SparkSQLHiveOnYarn --master yarn-cluster /tmp/sparksql.jar

--master 参数需指定为yarn-cluster才能使用yarn分布式资源。
--master 参数值为local[*]时为本地模式执行,此时不会被yarn管理。
说明一下上面使用spark-submit提交的命令:
--master yarn-cluster  //指定以yarn-cluster模式运行,关于yarn-cluster和yarn-client的区别,在之前的文章中提到过

--driver-memory 4G  //指定Driver使用的内存为4G,

//如果太小的话,会报错:Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread “Driver”

--driver-java-options “-XX:MaxPermSize=1G”   //指定Driver程序JVM参数

–files $HIVE_HOME/conf/hive-site.xml    //将Hive的配置文件添加到Driver和Executor的classpath中

--jars $HIVE_HOME/lib/mysql-connector-java-5.1.15-bin.jar,….    //将Hive依赖的jar包添加到Driver和Executor的classpath中

//需要依赖的jar包有:mysql-connector-java-5.1.15-bin.jar、datanucleus-api-jdo-3.2.6.jar、datanucleus-core-3.2.10.jar、datanucleus-rdbms-3.2.9.jar、guava-12.0.1.jar

注意:由于Driver和Executor需要访问Hive的元数据库,而Driver和Executor被分配到哪台机器上是不固定的,所以需要授权,使集群上所有机器都有操作Hive元数据库的权限。

运行Thrift JDBC/ODBC 服务

Thrift JDBC/ODBC server 的实现与hive1.2.1的hiveserver2一致。

可以使用spark或hive1.2.1中的beeline测试jdbc/odbc。

启动jdbc/odbc服务
使用以下命令,启动jdbc/odbc 服务
./sbin/start-thriftserver.sh

这个脚本接收所有bin/spark-submit的命令行操作,后面加上--hiveconf来指定hive属性。你可以运行脚本thriftserver.sh --help列出所有操纵参数。默认此服务监听localhost:10000端口,不过你可以通过环境变量覆盖此设置。
export HIVE_SERVER2_THRIFT_PORT=<listening-port>

export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>

./sbin/start-thriftserver.sh \

--master <master-uri> \

...

或系统属性
./sbin/start-thriftserver.sh \

--hiveconf hive.server2.thrift.port=<listening-port> \

--hiveconf hive.server2.thrift.bind.host=<listening-host> \

--master <master-uri>

...

现在你可以使用beeline连接jdbc/odbc了。命令如下:
./bin/beeline

在beeline中连接jdbc/odbc服务

beeline> !connect jdbc:hive2://localhost:10000

执行上面命令后,beeline会要求输入用户名密码,在非安全模式下,只需要输入用户名即可,密码为空。在安全模式下,请参考beeline的文档
Thrift jdbc 服务还提供了通过http发送thrift RPC信息,使用以下配置开启HTTP模式。配置文件为/conf下的hive-site.xml
hive.server2.transport.mode - 设置为 http

hive.server2.thrift.http.port - 监听的HTTP端口号; 默认为 10001

hive.server2.http.endpoint - HTTP endpoint;默认为cliservice


以下是测试beeline在HTTP模式下连接JDBC/ODBC的服务
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

核心类

DataFrame

此类为spark1.3.0及以后版本提供,在org.apache.spark.sql包下,实现了java.io.Serializable接口。
DataFrame类简化了以前使用RDD的操作。举个例子,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame的转化,并且支持SQL查询。
使用:SQLContext sqlContext = ...  // An existing SQLContextDataFrame df =  	sqlContext.sql("SELECT * FROM table")

数据格式和来源

现代的应用程序通常需要收集和分析来自各种不同数据源的数据,而DataFrame与生俱来就支持读取最流行的格式,包括JSON文件、Parquet文件和Hive表格。DataFrame还支持从多种类型的文件系统中读取,比如本地文件系统、分布式文件系统(HDFS)以及云存储(S3)。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。

DataFrame和RDD区别

升级到spark1.3.0以后,最大的改变是SchemaRDD已经改名为DataFrame。主要原因是DataFrame不再从RDD直接继承。RDDS应该专注于提供他们自己的实现,但DataFrame任然可以通过.rdd方式转换为RDDs。
在scala中,为了兼容性,scala提供了一alias用于从SchemaRDD转换为DataFrame。
以下是DataFrame和RDD的主要区别。



支持的数据类型

Json格式

如:scala> val df = sqlContext.jsonFile(“/path/to/your/jsonfile”)

----此方式在spark1.4.0中已废弃。替换为:
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

parquet格式

val people = sqlContext.read.parquet("...")  // in Scala 	DataFrame people = sqlContext.read().parquet("...")  // in Java

DataFrame方法

// 输出表结构

df.printSchema()

// 选择所有年龄大于21岁的人,只保留name字段

df.filter(df(“age”) > 21).select(“name”).show()

// 选择name,并把age字段自增 	df.select(“name”, df(“age”) + 1).show()

// 按年龄分组计数

df.groupBy(“age”).count().show()

// 左联表(注意是3个等号!)

df.join(df2, df(“name”) === df2(“name”), “left”).show()


ToDF
Returns a new DataFrame with columns renamed.  	This can be quite convenient in conversion from a RDD 	 of tuples into a DataFrame with meaningful names. For example: 	   val rdd: RDD[(Int, String)] = ... 	   rdd.toDF()  // this implicit conversion creates a DataFrame with column name _1 and _2 	   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"

虚拟表

将DataFrame对象转为虚拟表

df.registerTempTable(“people”) 	sqlContext.sql(“select age, count(*) from people group by age”).show()

其实,以上语句就等同于
df.groupBy("age").count().show()

DataFrameReader

此类主要实现将从外部存储系统加载数据如:本地文件系统、分布式文件系统(HDFS)、key-value存储、云存储(S3)、etc。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。
Since spark1.4.0

MySQL

除了JSON之外,DataFrame现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc实现的。
对于不同的关系数据库,必须在SPARK_CLASSPATH变量中加入对应connector的jar包,比如希望连接MySQL的话应该这么启动spark-shell:
SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell

下面要将一个MySQL表转化为DataFrame对象:
Val jdbcDF = sqlContext.load(“jdbc”, Map(“url” -> “jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password”, “dbtable” -> “your_table”))

Hive

Spark提供了一个HiveContext的上下文,此类是SQLContext的子类,但从作用上来说,sqlContext也支持Hive数据源。需要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件移动到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:
sqlContext.sql(“select count(*) from hive_people”).show()

持久表saveAsTable

当使用hivecontext时,DataFrame也可以使用命令saveAsTable保存为持久表。不同于registerTempTable命令,saveAsTable会持久化DataFrame里的数据,并在hivemetastore里创建一个指向链接。只要你连接的是同一个metastore,即使spark程序重启了,持久表也会一直存在。在sqlContext中,可以使用表名称的方式调用持久表的方法。
默认情况下,saveAsTable 会自动创建自己的“管理表”来存储持久表的元数据,当表删除时,管理表也将管理的元数据链接删除。

表分区

表分区是像hive这样的系统优化常用的方法。在分区表中,数据通常存在不同的目录中。每一个列值是一个分区目录。Parquet 格式的数据可以自动发现和自动推断分区信息。
如下:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...

├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

Hive table

//scis an existing JavaSparkContext.HiveContext sqlContext = 	  new org.apache.spark.sql.hive.HiveContext(sc.sc); 	sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); 	sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); 	// Queries are expressed in HiveQL.Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();

支持的hive特性

Spark SQL 支持的hive特性如下:
l Hive query statements, including:
1、SELECT

2、GROUP BY

3、ORDER BY

4、CLUSTER BY

5、SORT BY

l All Hive operators, including:
1、Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc)

2、Arithmetic operators (+, -, *, /, %, etc)

3、Logical operators (AND, &&, OR, ||, etc)

4、Complex type constructors

5、Mathematical functions (sign, ln, cos, etc)

6、String functions (instr, length, printf, etc)

l User defined functions (UDF)
l User defined aggregation functions (UDAF)
l User defined serialization formats (SerDes)
l Window functions
l Joins
1、JOIN

2、{LEFT|RIGHT|FULL} OUTER JOIN

3、LEFT SEMI JOIN

4、CROSS JOIN

l Unions
l Sub-queries
SELECT col FROM ( SELECT a + b AS col from t1) t2
l Sampling
l Explain
l Partitioned tables including dynamic partition insertion
l View
l All Hive DDL Functions, including:
1、CREATE TABLE

2、CREATE TABLE AS SELECT

3、ALTER TABLE

l Most Hive Data types, including:
1、TINYINT

2、SMALLINT

3、INT

4、BIGINT

5、BOOLEAN

6、FLOAT

7、DOUBLE

8、STRING

9、BINARY

10、TIMESTAMP

11、DATE

12、ARRAY<>

13、MAP<>

14、STRUCT<>

不支持的hive特性

1、桶buckets,桶是hive表分区的一种方式,目前为止,spark sql还不支持。

2、Hive查询元数据,不用启动作业,而sparksql查询元数据时,仍然要启动一个作业来查询。

3、Hive支持索引,而spark sql则暂时不支持。

4、块级位图索引和虚拟列(用于构建索引),这些sparksql还不支持。

5、Hive中可以自动为joins和groupbys确定reduces数量。而在sparksql中则需要手动设置并行度。设置SET spark.sql.shuffle.partitions=[num_tasks];

6、数据倾斜标识,spark sql不支持hive的数据倾斜标识。

7、Hive支持加入STREAMTABLE提示,spark SQL不支持STREAMTABLE 提示。

8、查询结果合并多个小文件:如果结果输出包含多个小文件,hive可以选择小文件合并到更少的大文件,以避免HDFS元数据文件过大。Spark SQL不支持。

9、UNION type

10、Unique join

11、Column statistics collecting:不支持列信息统计,仅支持

12、populating the sizeInBytes field of the hive metastore.

13、Hive Input/Output Formats


1、CLI文件格式化:当结果在CLI显示时,SparkSQL仅支持TextOutputFormat.

2、Hadoop archive
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: