您的位置:首页 > 数据库

sparkCookbook4-SparkSQL

2015-09-02 16:35 351 查看


catalyst优化器由两个主要目标:

方便的添加新的优化技术

允许外部开发者扩展优化器

sparkSQL在四个阶段使用catalyst转换框架

1, Analyzing a logical plan to resolve references

2,Logical plan optimization

3, Physical planning

4,Code generation to compile the parts of the query to Java bytecode

分析



Logical plan optimization



把query推到数据存储的地方,过滤不需要的数据

physical planning



Code generation

最后一个阶段生成运行在所有机器上的java二进制代码,使用scala的quasi quotes特质完成。

实战

$ spark-shell --driver-memory 1G
执行HiveQL时报错:
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.

测试单纯把mysql-connector-java-5.1.18-bin.jar放到spark/lib下并不能解决问题,环境变量里添加:
export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar依然没解决

下面两种方式都可以
./bin/spark-sql --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar

spark-shell --driver-memory 1G --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar


scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)


hc.sql("create table if not exists person(first_name
string, last_name string, age int) row format delimited fields
terminated by ','")


另开一个终端

$ cd /usr/local/spark/spark-data/
$ mkdir person
$ echo "Barack,Obama,53" >> person/person.txt
$ echo "George,Bush,68" >> person/person.txt
$ echo "Bill,Clinton,68" >> person/person.txt


加载数据到person表

scala>
hc.sql("load data local inpath \"/usr/local/spark/spark-data/person\" into table person ")


scala>hc.sql("select * from person").collect.foreach(println)
[Barack,Obama,53]
[George,Bush,68]
[Bill,Clinton,68]


或者从hdfs加载数据,这将会把数据从hdfs移动到hive的warehouse目录,也可以使用绝对路径:hdfs://localhost:9000/user/hduser/person

scala> hc.sql("load data inpath \"/user/hduser/person\" into table
person")


使用HiveQL查询数据

scala> val persons = hc.sql("from person select first_name,last_
name,age")
scala> persons.collect.foreach(println)


从查询结果创建新表

scala> hc.sql("create table person2 as select first_name, last_
name from person")


直接从其他表复制

scala> hc.sql("create table person2 like person location '/user/
hive/warehouse/person'")


创建两个新表保存counts

scala> hc.sql("create table people_by_last_name(last_name
string,count int)")
scala> hc.sql("create table people_by_age(age int,count int)")


使用HiveQL插入多个记录

scala> hc.sql("""from person
insert overwrite table people_by_last_name
select last_name, count(distinct first_name)
group by last_name
insert overwrite table people_by_age
select age, count(distinct first_name)
group by age """)


scala> hc.sql("show tables").collect.foreach(println)
[people_by_age,false]
[people_by_last_name,false]
[person,false]
[person2,false]
[sogouq1,false]
[sogouq2,false]
[src,false]
[t_hadoop,false]
[t_hive2,false]


使用case classes推断模式

$ spark-shell --driver-memory 1G

scala> import sqlContext.implicits._

scala> case class Person(first_name:String,last_
name:String,age:Int)
defined class Person


把person放到hdfs

hadoop fs -put person /user/hadoop/person


这一步把 person/person.txt放到了hdfs上

scala> val p = sc.textFile("hdfs://localhost:9000/user/hadoop/
person")

val pmap = p.map( line => line.split(","))
// Convert the RDD of Array[String] into the RDD of Person case objects:
scala> val personRDD = pmap.map( p => Person(p(0),p(1),p(2).
toInt))
//to DataFrame:
scala> val personDF = personRDD.toDF

scala> personDF.registerTempTable("person")

scala> val people = sql("select * from person")

scala> people.collect.foreach(println)
[Barack,Obama,53]
[George,Bush,68]
[Bill,Clinton,68]


编程指定Schema

case class在某些情况下不能使用,例如:不能多于22个字段,另外一种情况,事先不知道schema,此时数据作为行对象的RDD加载,schema分别用StructType和StructField对象创建,分别代表一个表和一个字段。把schema应用到行RDD来创建一个数据框。

1.   Start the Spark shell and give it some extra memory:
$ spark-shell --driver-memory 1G
2.   Import for the implicit conversion:
scala> import sqlContext.implicit._
3.   Import the Spark SQL datatypes and Row objects:
scala> import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
4.   In another shell, create some sample data to be put in HDFS:
$ mkdir person
$ echo "Barack,Obama,53" >> person/person.txt
$ echo "George,Bush,68" >> person/person.txt
$ echo "Bill,Clinton,68" >> person/person.txt
$ hdfs dfs -put person person
5.   Load the person data in an RDD:
scala> val p = sc.textFile("hdfs://localhost:9000/user/hadoop/
person")
6.   Split each line into an array of string, based on a comma, as a delimiter:
scala> val pmap = p.map( line => line.split(","))
7.   把array[String]RDD转换为行对象
val personData = pmap.map( p => Row(p(0),p(1),p(2).toInt))

8.创建schema  :参数名,参数类型,可否为空
val schema = StructType(
Array(StructField("first_name",StringType,true),
StructField("last_name",StringType,true),
StructField("age",IntegerType,true)
))

9.把schema应用到RDD获得数据框
val personDF = sqlContext.createDataFrame(personData,sche
ma)

personDF.registerTempTable("person")
val persons = sql("select * from person")
persons.collect.foreach(println)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: