您的位置:首页 > 数据库

Spark SQL小结

2016-03-10 16:05 295 查看
在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。Databricks表示,Spark SQL将涵盖Shark的所有特性,用户可以从Shark 0.9进行无缝的升级。现在Databricks推广的Shark相关项目一共有两个,分别是Spark SQL和新的Hive on Spark(HIVE-7292)。如下图所示:




Spark SQL运行以SQL的方式来操作数据,类似Hive和Pig。其核心组件为一种新类型的RDD——JavaSchemaRDD,一个JavaSchemaRDD就好比传统关系型数据库中的一张表。JavaSchemaRDD可以从已有的RDD创建,还可以从Parquet文件、JSON数据集、HIVE、普通数据文件中创建。但现阶段(1.0.2版本)的Spark SQL还是alpha版,日后的API难免会发生变化,所以是否要使用该功能,现阶段还值得商榷。

程序示例

Bean,必须要有get方法,底层采用反射来获取各属性。

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

publicstaticclassPersonimplementsSerializable{

    privateStringname;

    privateintage;

 

    publicStringgetName(){

        returnname;

    }

 

    publicintgetAge(){

        returnage;

    }

 

    publicPerson(Stringname,intage){

        this.name=name;

        this.age=age;

    }

}

 

 

Spark SQL示例

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

publicstaticvoidmain(String[]args){

        SparkConf sparkConf=newSparkConf()

              .setAppName("JavaSparkSQL")

              .setMaster("local[2]");

        JavaSparkContext ctx=newJavaSparkContext(sparkConf);

        JavaSQLContext sqlCtx=newJavaSQLContext(ctx);

 

        JavaRDD<Person>people=ctx.textFile("/home/yurnom/people.txt")//文档内容见下文

              .map(line->{

            String[]parts=line.split(",");

            returnnewPerson(parts[0],Integer.parseInt(parts[1].trim()));//创建一个bean

        });

 

        JavaSchemaRDD schemaPeople=sqlCtx.applySchema(people,Person.class);

        schemaPeople.registerAsTable("people");//注册为一张table

        JavaSchemaRDD teenagers=sqlCtx.sql(//执行sql语句,属性名同bean的属性名

              "SELECT name FROM people WHERE age >= 13 AND age <= 19");

 

        List<String>teenagerNames=teenagers

              .map(row->"Name: "+row.getString(0)).collect();

 

        for(Strings:teenagerNames){

            System.out.println(s);

        }

    }

 

运行结果

 

1

Name:Justin

 

people.txt文件内容

 

 

1
2
3

Michael,29

Andy,30

Justin,19

 

使用Parquet Files

Parquet文件允许将schema信息和数据信息固化在磁盘上,以供下一次的读取。

 

1
2
3
4
5
6
7
8
9
10
11

    //存为Parquet文件

    schemaPeople.saveAsParquetFile("people.parquet");

    //从Parquet文件中创建JavaSchemaRDD

    JavaSchemaRDD parquetFile=sqlCtx.parquetFile("people.parquet");

    //注册为一张table

    parquetFile.registerAsTable("parquetFile");

    JavaSchemaRDD teenagers2=sqlCtx.sql("SELECT * FROM parquetFile WHERE age >= 25");

    for(Rowr:teenagers2.collect()){

        System.out.println(r.get(0));

        System.out.println(r.get(1));

    }

 

运行结果

 

 

1
2
3
4

29
Michael
30
Andy

 

可以看到输出属性的顺序和Bean中的不一样,此处猜测可能采用的字典序,但未经过测试证实。

JSON数据集

Spark SQL还可以采用JSON格式的文件作为输入源。people.json文件内容如下:

 

 

1
2
3

{"name":"Michael","age":29}

{"name":"Andy","age":30}

{"name":"Justin","age":19}

 

将上方程序示例中代码行8-14行替换为下方代码即可:

 

1

JavaSchemaRDD schemaPeople=sqlCtx.jsonFile("/home/yurnom/people.json");

 

运行结果与上文相同。此外还可以用如下方式加载JSON数据:

 

 

1
2
3
4

List<String>jsonData=Arrays.asList(

  "{\"name\":\"Yurnom\",\"age\":26}");

JavaRDD<String>anotherPeopleRDD=sc.parallelize(jsonData);

JavaSchemaRDD anotherPeople=sqlContext.jsonRDD(anotherPeopleRDD);

 

连接Hive

Spark SQL运行使用SQL语句来读写Hive的数据,但由于Hive的依赖包过多,默认情况下要连接Hive需要自行添加相关的依赖包。可以使用以下命令来生成一个含有Hive依赖的Jar,而此Jar必须分发到Spark集群中的每一台机器上去。

 

1

SPARK_HIVE=truesbt/sbt assembly/assembly

 

最后将Hive的配置文件拷贝至conf文件夹下即可。官方Hive使用示例:

 

 

1
2
3
4
5
6
7
8

// sc is an existing JavaSparkContext.

JavaHiveContext hiveContext=neworg.apache.spark.sql.hive.api.java.HiveContext(sc);

 

hiveContext.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");

hiveContext.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

 
// Queries are expressed in HiveQL.

Row[]results=hiveContext.hql("FROM src SELECT key, value").collect();

 

总结

Spark SQL将原本就已经封装的很好的Spark原语的使用再简化了一次,使得懂SQL语句的运维人员都可以通过Spark SQL来进行大数据分析。目前来说Spark SQL还处于alpha版本,对于开发人员的意义不大,静观后续的变化。

 
http://blog.selfup.cn/657.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: