Spark SQL小结
2016-03-10 16:05
295 查看
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。现在Databricks推广的Shark相关项目一共有两个,分别是Spark SQL和新的Hive on Spark(HIVE-7292)。如下图所示:
Spark SQL运行以SQL的方式来操作数据,类似Hive和Pig。其核心组件为一种新类型的RDD——JavaSchemaRDD,一个JavaSchemaRDD就好比传统关系型数据库中的一张表。JavaSchemaRDD可以从已有的RDD创建,还可以从Parquet文件、JSON数据集、HIVE、普通数据文件中创建。但现阶段(1.0.2版本)的Spark SQL还是alpha版,日后的API难免会发生变化,所以是否要使用该功能,现阶段还值得商榷。
Spark SQL示例
运行结果
people.txt文件内容
运行结果
可以看到输出属性的顺序和Bean中的不一样,此处猜测可能采用的字典序,但未经过测试证实。
将上方程序示例中代码行8-14行替换为下方代码即可:
运行结果与上文相同。此外还可以用如下方式加载JSON数据:
最后将Hive的配置文件拷贝至conf文件夹下即可。官方Hive使用示例:
http://blog.selfup.cn/657.html
Spark SQL运行以SQL的方式来操作数据,类似Hive和Pig。其核心组件为一种新类型的RDD——JavaSchemaRDD,一个JavaSchemaRDD就好比传统关系型数据库中的一张表。JavaSchemaRDD可以从已有的RDD创建,还可以从Parquet文件、JSON数据集、HIVE、普通数据文件中创建。但现阶段(1.0.2版本)的Spark SQL还是alpha版,日后的API难免会发生变化,所以是否要使用该功能,现阶段还值得商榷。
程序示例
Bean,必须要有get方法,底层采用反射来获取各属性。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | publicstaticclassPersonimplementsSerializable{ privateStringname; privateintage; publicStringgetName(){ returnname; } publicintgetAge(){ returnage; } publicPerson(Stringname,intage){ this.name=name; this.age=age; } } |
Spark SQL示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | publicstaticvoidmain(String[]args){ SparkConf sparkConf=newSparkConf() .setAppName("JavaSparkSQL") .setMaster("local[2]"); JavaSparkContext ctx=newJavaSparkContext(sparkConf); JavaSQLContext sqlCtx=newJavaSQLContext(ctx); JavaRDD<Person>people=ctx.textFile("/home/yurnom/people.txt")//文档内容见下文 .map(line->{ String[]parts=line.split(","); returnnewPerson(parts[0],Integer.parseInt(parts[1].trim()));//创建一个bean }); JavaSchemaRDD schemaPeople=sqlCtx.applySchema(people,Person.class); schemaPeople.registerAsTable("people");//注册为一张table JavaSchemaRDD teenagers=sqlCtx.sql(//执行sql语句,属性名同bean的属性名 "SELECT name FROM people WHERE age >= 13 AND age <= 19"); List<String>teenagerNames=teenagers .map(row->"Name: "+row.getString(0)).collect(); for(Strings:teenagerNames){ System.out.println(s); } } |
运行结果
1 | Name:Justin |
people.txt文件内容
1 2 3 | Michael,29 Andy,30 Justin,19 |
使用Parquet Files
Parquet文件允许将schema信息和数据信息固化在磁盘上,以供下一次的读取。1 2 3 4 5 6 7 8 9 10 11 | //存为Parquet文件 schemaPeople.saveAsParquetFile("people.parquet"); //从Parquet文件中创建JavaSchemaRDD JavaSchemaRDD parquetFile=sqlCtx.parquetFile("people.parquet"); //注册为一张table parquetFile.registerAsTable("parquetFile"); JavaSchemaRDD teenagers2=sqlCtx.sql("SELECT * FROM parquetFile WHERE age >= 25"); for(Rowr:teenagers2.collect()){ System.out.println(r.get(0)); System.out.println(r.get(1)); } |
运行结果
1 2 3 4 | 29 Michael 30 Andy |
可以看到输出属性的顺序和Bean中的不一样,此处猜测可能采用的字典序,但未经过测试证实。
JSON数据集
Spark SQL还可以采用JSON格式的文件作为输入源。people.json文件内容如下:1 2 3 | {"name":"Michael","age":29} {"name":"Andy","age":30} {"name":"Justin","age":19} |
将上方程序示例中代码行8-14行替换为下方代码即可:
1 | JavaSchemaRDD schemaPeople=sqlCtx.jsonFile("/home/yurnom/people.json"); |
运行结果与上文相同。此外还可以用如下方式加载JSON数据:
1 2 3 4 | List<String>jsonData=Arrays.asList( "{\"name\":\"Yurnom\",\"age\":26}"); JavaRDD<String>anotherPeopleRDD=sc.parallelize(jsonData); JavaSchemaRDD anotherPeople=sqlContext.jsonRDD(anotherPeopleRDD); |
连接Hive
Spark SQL运行使用SQL语句来读写Hive的数据,但由于Hive的依赖包过多,默认情况下要连接Hive需要自行添加相关的依赖包。可以使用以下命令来生成一个含有Hive依赖的Jar,而此Jar必须分发到Spark集群中的每一台机器上去。1 | SPARK_HIVE=truesbt/sbt assembly/assembly |
最后将Hive的配置文件拷贝至conf文件夹下即可。官方Hive使用示例:
1 2 3 4 5 6 7 8 | // sc is an existing JavaSparkContext. JavaHiveContext hiveContext=neworg.apache.spark.sql.hive.api.java.HiveContext(sc); hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL. Row[]results=hiveContext.hql("FROM src SELECT key, value").collect(); |
总结
Spark SQL将原本就已经封装的很好的Spark原语的使用再简化了一次,使得懂SQL语句的运维人员都可以通过Spark SQL来进行大数据分析。目前来说Spark SQL还处于alpha版本,对于开发人员的意义不大,静观后续的变化。http://blog.selfup.cn/657.html
相关文章推荐
- Jedis 是 Redis 官方首选的 Java 客户端开发包
- redis 五种数据类型的使用场景
- redis-cluster研究和使用
- 基于Jedis实现Redis分片的理解
- 一个小时内学习 SQLite 数据库
- java.lang.NocalssDefFoundError:oracle.jdbc.driver.T4CpreparedStatement
- com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in
- java 程序调用Oracle Function
- Redis分区
- sqlite database browser的使用
- windows下安装redis
- Redis实战系列(1) 运行多个实例以充分发挥多核处理器的能力
- Redis常用命令
- C#使用System.Data.SQLite操作SQLite
- JAVA客户端调用memcached
- redis 用setbit(bitmap)统计活跃用户
- sql 解析器
- 关于redis、memcache、mongoDB 的对比
- 阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&amp;消费
- memcached安装配置