spark sql 基础
2016-04-21 16:54
501 查看
SparkSQL初始化
Java API
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); SQLContext sqlContext = new SQLContext(ctx);
启动Spark SQL CLI
1、将hive的配置文件hive-site.xml放到$spark_home的conf/下。2、在spark的spark-env.sh中加入如下配置
export HADOOP_CONF_DIR=/usr/local/hadoop-2.7.0/etc/hadoop
3、启动命令:./bin/spark-sql启动。
Spark-shell on yarn
命令:./bin/spark-shell --master yarn-client --executor-memory 1g --num-executors 10
注意:这里的–master必须使用yarn-client模式,如果指定yarn-cluster,则会报错:
Error: Cluster deploy mode is not applicable to Spark shells.
因为spark-shell作为一个与用户交互的命令行,必须将Driver运行在本地,而不是yarn上。
其中的参数与提交Spark应用程序到yarn上用法一样。
在RM管理界面可以看到spark-sql作为一个长服务的任务运行这yarn上。
Spark-sql on yarn
运行这个命令之前,需要把hive-site.xml放入$spark_home/conf下,把mysql-connector-java-5.1.15-bin.jar放入$spark_home/lib下。
命令:./bin/spark-sql --master yarn-client --executor-memory 1g --num-executors 10
Spark-submit whith hive提交作业
./bin/spark-submit --class testHive.SparkSQLHiveOnYarn --master yarn-cluster /tmp/sparksql.jar
--master 参数需指定为yarn-cluster才能使用yarn分布式资源。
--master 参数值为local[*]时为本地模式执行,此时不会被yarn管理。
说明一下上面使用spark-submit提交的命令:
--master yarn-cluster //指定以yarn-cluster模式运行,关于yarn-cluster和yarn-client的区别,在之前的文章中提到过
--driver-memory 4G //指定Driver使用的内存为4G,
//如果太小的话,会报错:Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread “Driver”
--driver-java-options “-XX:MaxPermSize=1G” //指定Driver程序JVM参数
–files $HIVE_HOME/conf/hive-site.xml //将Hive的配置文件添加到Driver和Executor的classpath中
--jars $HIVE_HOME/lib/mysql-connector-java-5.1.15-bin.jar,…. //将Hive依赖的jar包添加到Driver和Executor的classpath中
//需要依赖的jar包有:mysql-connector-java-5.1.15-bin.jar、datanucleus-api-jdo-3.2.6.jar、datanucleus-core-3.2.10.jar、datanucleus-rdbms-3.2.9.jar、guava-12.0.1.jar
注意:由于Driver和Executor需要访问Hive的元数据库,而Driver和Executor被分配到哪台机器上是不固定的,所以需要授权,使集群上所有机器都有操作Hive元数据库的权限。
运行Thrift JDBC/ODBC 服务
Thrift JDBC/ODBC server 的实现与hive1.2.1的hiveserver2一致。 可以使用spark或hive1.2.1中的beeline测试jdbc/odbc。
启动jdbc/odbc服务
使用以下命令,启动jdbc/odbc 服务
./sbin/start-thriftserver.sh
这个脚本接收所有bin/spark-submit的命令行操作,后面加上--hiveconf来指定hive属性。你可以运行脚本thriftserver.sh --help列出所有操纵参数。默认此服务监听localhost:10000端口,不过你可以通过环境变量覆盖此设置。
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...
或系统属性
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
现在你可以使用beeline连接jdbc/odbc了。命令如下:
./bin/beeline
在beeline中连接jdbc/odbc服务
beeline> !connect jdbc:hive2://localhost:10000
执行上面命令后,beeline会要求输入用户名密码,在非安全模式下,只需要输入用户名即可,密码为空。在安全模式下,请参考beeline的文档
Thrift jdbc 服务还提供了通过http发送thrift RPC信息,使用以下配置开启HTTP模式。配置文件为/conf下的hive-site.xml
hive.server2.transport.mode - 设置为 http
hive.server2.thrift.http.port - 监听的HTTP端口号; 默认为 10001
hive.server2.http.endpoint - HTTP endpoint;默认为cliservice
以下是测试beeline在HTTP模式下连接JDBC/ODBC的服务
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
核心类
DataFrame
此类为spark1.3.0及以后版本提供,在org.apache.spark.sql包下,实现了java.io.Serializable接口。DataFrame类简化了以前使用RDD的操作。举个例子,有时我们需要用到一些额外的结构化数据(比如做IP和地址的映射),通常这样的数据会存在MySQL,而访问的方式有两种:一是每个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,然后编写繁复的函数去实现检索,显然要写更多的代码。而现在,Spark提供了一种新的选择,一行代码就能实现从MySQL到DataFrame的转化,并且支持SQL查询。
使用:SQLContext sqlContext = ... // An existing SQLContextDataFrame df = sqlContext.sql("SELECT * FROM table")
数据格式和来源
现代的应用程序通常需要收集和分析来自各种不同数据源的数据,而DataFrame与生俱来就支持读取最流行的格式,包括JSON文件、Parquet文件和Hive表格。DataFrame还支持从多种类型的文件系统中读取,比如本地文件系统、分布式文件系统(HDFS)以及云存储(S3)。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。DataFrame和RDD区别
升级到spark1.3.0以后,最大的改变是SchemaRDD已经改名为DataFrame。主要原因是DataFrame不再从RDD直接继承。RDDS应该专注于提供他们自己的实现,但DataFrame任然可以通过.rdd方式转换为RDDs。在scala中,为了兼容性,scala提供了一alias用于从SchemaRDD转换为DataFrame。
以下是DataFrame和RDD的主要区别。
支持的数据类型
Json格式
如:scala> val df = sqlContext.jsonFile(“/path/to/your/jsonfile”)
----此方式在spark1.4.0中已废弃。替换为:
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
parquet格式
val people = sqlContext.read.parquet("...") // in Scala DataFrame people = sqlContext.read().parquet("...") // in Java
DataFrame方法
// 输出表结构 df.printSchema() // 选择所有年龄大于21岁的人,只保留name字段 df.filter(df(“age”) > 21).select(“name”).show() // 选择name,并把age字段自增 df.select(“name”, df(“age”) + 1).show() // 按年龄分组计数 df.groupBy(“age”).count().show() // 左联表(注意是3个等号!) df.join(df2, df(“name”) === df2(“name”), “left”).show()
ToDF
Returns a new DataFrame with columns renamed. This can be quite convenient in conversion from a RDD of tuples into a DataFrame with meaningful names. For example: val rdd: RDD[(Int, String)] = ... rdd.toDF() // this implicit conversion creates a DataFrame with column name _1 and _2 rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
虚拟表
将DataFrame对象转为虚拟表
df.registerTempTable(“people”) sqlContext.sql(“select age, count(*) from people group by age”).show()
其实,以上语句就等同于
df.groupBy("age").count().show()
DataFrameReader
此类主要实现将从外部存储系统加载数据如:本地文件系统、分布式文件系统(HDFS)、key-value存储、云存储(S3)、etc。同时,配合JDBC,它还可以读取外部关系型数据库系统。此外,通过Spark SQL的外部数据源(external data sources) API,DataFrames可以更广泛地支持任何第三方数据格式和数据源。值得一提的是,当下的第三方扩展已经包含Avro、CSV、ElasticSearch和Cassandra。Since spark1.4.0
MySQL
除了JSON之外,DataFrame现在已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是通过jdbc实现的。对于不同的关系数据库,必须在SPARK_CLASSPATH变量中加入对应connector的jar包,比如希望连接MySQL的话应该这么启动spark-shell:
SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell
下面要将一个MySQL表转化为DataFrame对象:
Val jdbcDF = sqlContext.load(“jdbc”, Map(“url” -> “jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password”, “dbtable” -> “your_table”))
Hive
Spark提供了一个HiveContext的上下文,此类是SQLContext的子类,但从作用上来说,sqlContext也支持Hive数据源。需要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件移动到$SPARK_HOME/conf路径下,我们就可以直接用Spark查询包含已有元数据的Hive表了:sqlContext.sql(“select count(*) from hive_people”).show()
持久表saveAsTable
当使用hivecontext时,DataFrame也可以使用命令saveAsTable保存为持久表。不同于registerTempTable命令,saveAsTable会持久化DataFrame里的数据,并在hivemetastore里创建一个指向链接。只要你连接的是同一个metastore,即使spark程序重启了,持久表也会一直存在。在sqlContext中,可以使用表名称的方式调用持久表的方法。默认情况下,saveAsTable 会自动创建自己的“管理表”来存储持久表的元数据,当表删除时,管理表也将管理的元数据链接删除。
表分区
表分区是像hive这样的系统优化常用的方法。在分区表中,数据通常存在不同的目录中。每一个列值是一个分区目录。Parquet 格式的数据可以自动发现和自动推断分区信息。如下:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
Hive table
//scis an existing JavaSparkContext.HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc); sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL.Row[] results = sqlContext.sql("FROM src SELECT key, value").collect();
支持的hive特性
Spark SQL 支持的hive特性如下:l Hive query statements, including:
1、SELECT 2、GROUP BY 3、ORDER BY 4、CLUSTER BY 5、SORT BY
l All Hive operators, including:
1、Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc) 2、Arithmetic operators (+, -, *, /, %, etc) 3、Logical operators (AND, &&, OR, ||, etc) 4、Complex type constructors 5、Mathematical functions (sign, ln, cos, etc) 6、String functions (instr, length, printf, etc)
l User defined functions (UDF)
l User defined aggregation functions (UDAF)
l User defined serialization formats (SerDes)
l Window functions
l Joins
1、JOIN 2、{LEFT|RIGHT|FULL} OUTER JOIN 3、LEFT SEMI JOIN 4、CROSS JOIN
l Unions
l Sub-queries
SELECT col FROM ( SELECT a + b AS col from t1) t2
l Sampling
l Explain
l Partitioned tables including dynamic partition insertion
l View
l All Hive DDL Functions, including:
1、CREATE TABLE 2、CREATE TABLE AS SELECT 3、ALTER TABLE
l Most Hive Data types, including:
1、TINYINT 2、SMALLINT 3、INT 4、BIGINT 5、BOOLEAN 6、FLOAT 7、DOUBLE 8、STRING 9、BINARY 10、TIMESTAMP 11、DATE 12、ARRAY<> 13、MAP<> 14、STRUCT<>
不支持的hive特性
1、桶buckets,桶是hive表分区的一种方式,目前为止,spark sql还不支持。 2、Hive查询元数据,不用启动作业,而sparksql查询元数据时,仍然要启动一个作业来查询。 3、Hive支持索引,而spark sql则暂时不支持。 4、块级位图索引和虚拟列(用于构建索引),这些sparksql还不支持。 5、Hive中可以自动为joins和groupbys确定reduces数量。而在sparksql中则需要手动设置并行度。设置SET spark.sql.shuffle.partitions=[num_tasks]; 6、数据倾斜标识,spark sql不支持hive的数据倾斜标识。 7、Hive支持加入STREAMTABLE提示,spark SQL不支持STREAMTABLE 提示。 8、查询结果合并多个小文件:如果结果输出包含多个小文件,hive可以选择小文件合并到更少的大文件,以避免HDFS元数据文件过大。Spark SQL不支持。 9、UNION type 10、Unique join 11、Column statistics collecting:不支持列信息统计,仅支持 12、populating the sizeInBytes field of the hive metastore. 13、Hive Input/Output Formats
1、CLI文件格式化:当结果在CLI显示时,SparkSQL仅支持TextOutputFormat. 2、Hadoop archive
相关文章推荐
- mysql支持跨表delete删除多表记录
- Mongodb 设置密码
- Mongodb 设置密码
- Mongodb 设置密码
- MongoDB -- 3.2新功能Document Validation
- sqlite安装使用
- 数据库
- Oracle 10g安装64位图解流程
- Redis环境搭建
- postgres数据库创建主分表的语法
- Scrapy Pipeline之使用专门的Twisted客户端(以Redis缓存为例)
- Centos开机自启动redis
- mysql 创建超级用户
- 【MySQL】 into outfile csv格式文件添加 字段
- Redis(五)Java连接Redis实例
- mysql设置远程连接
- 数据库缓存管理器块替换
- MySQL 中对于同一表的两种操作
- 数据库sql优化1
- SQL语言