sparkCookbook4-SparkSQL
2015-09-02 16:35
351 查看
catalyst优化器由两个主要目标:
方便的添加新的优化技术
允许外部开发者扩展优化器
sparkSQL在四个阶段使用catalyst转换框架
1, Analyzing a logical plan to resolve references
2,Logical plan optimization
3, Physical planning
4,Code generation to compile the parts of the query to Java bytecode
分析
Logical plan optimization
把query推到数据存储的地方,过滤不需要的数据
physical planning
Code generation
最后一个阶段生成运行在所有机器上的java二进制代码,使用scala的quasi quotes特质完成。实战
$ spark-shell --driver-memory 1G 执行HiveQL时报错: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver. 测试单纯把mysql-connector-java-5.1.18-bin.jar放到spark/lib下并不能解决问题,环境变量里添加: export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar依然没解决 下面两种方式都可以 ./bin/spark-sql --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar spark-shell --driver-memory 1G --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.18-bin.jar
scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
hc.sql("create table if not exists person(first_name string, last_name string, age int) row format delimited fields terminated by ','")
另开一个终端
$ cd /usr/local/spark/spark-data/ $ mkdir person $ echo "Barack,Obama,53" >> person/person.txt $ echo "George,Bush,68" >> person/person.txt $ echo "Bill,Clinton,68" >> person/person.txt
加载数据到person表
scala> hc.sql("load data local inpath \"/usr/local/spark/spark-data/person\" into table person ")
scala>hc.sql("select * from person").collect.foreach(println) [Barack,Obama,53] [George,Bush,68] [Bill,Clinton,68]
或者从hdfs加载数据,这将会把数据从hdfs移动到hive的warehouse目录,也可以使用绝对路径:hdfs://localhost:9000/user/hduser/person
scala> hc.sql("load data inpath \"/user/hduser/person\" into table person")
使用HiveQL查询数据
scala> val persons = hc.sql("from person select first_name,last_ name,age") scala> persons.collect.foreach(println)
从查询结果创建新表
scala> hc.sql("create table person2 as select first_name, last_ name from person")
直接从其他表复制
scala> hc.sql("create table person2 like person location '/user/ hive/warehouse/person'")
创建两个新表保存counts
scala> hc.sql("create table people_by_last_name(last_name string,count int)") scala> hc.sql("create table people_by_age(age int,count int)")
使用HiveQL插入多个记录
scala> hc.sql("""from person insert overwrite table people_by_last_name select last_name, count(distinct first_name) group by last_name insert overwrite table people_by_age select age, count(distinct first_name) group by age """)
scala> hc.sql("show tables").collect.foreach(println) [people_by_age,false] [people_by_last_name,false] [person,false] [person2,false] [sogouq1,false] [sogouq2,false] [src,false] [t_hadoop,false] [t_hive2,false]
使用case classes推断模式
$ spark-shell --driver-memory 1G scala> import sqlContext.implicits._ scala> case class Person(first_name:String,last_ name:String,age:Int) defined class Person
把person放到hdfs
hadoop fs -put person /user/hadoop/person
这一步把 person/person.txt放到了hdfs上
scala> val p = sc.textFile("hdfs://localhost:9000/user/hadoop/ person") val pmap = p.map( line => line.split(",")) // Convert the RDD of Array[String] into the RDD of Person case objects: scala> val personRDD = pmap.map( p => Person(p(0),p(1),p(2). toInt)) //to DataFrame: scala> val personDF = personRDD.toDF scala> personDF.registerTempTable("person") scala> val people = sql("select * from person") scala> people.collect.foreach(println) [Barack,Obama,53] [George,Bush,68] [Bill,Clinton,68]
编程指定Schema
case class在某些情况下不能使用,例如:不能多于22个字段,另外一种情况,事先不知道schema,此时数据作为行对象的RDD加载,schema分别用StructType和StructField对象创建,分别代表一个表和一个字段。把schema应用到行RDD来创建一个数据框。1. Start the Spark shell and give it some extra memory: $ spark-shell --driver-memory 1G 2. Import for the implicit conversion: scala> import sqlContext.implicit._ 3. Import the Spark SQL datatypes and Row objects: scala> import org.apache.spark.sql._ scala> import org.apache.spark.sql.types._ 4. In another shell, create some sample data to be put in HDFS: $ mkdir person $ echo "Barack,Obama,53" >> person/person.txt $ echo "George,Bush,68" >> person/person.txt $ echo "Bill,Clinton,68" >> person/person.txt $ hdfs dfs -put person person 5. Load the person data in an RDD: scala> val p = sc.textFile("hdfs://localhost:9000/user/hadoop/ person") 6. Split each line into an array of string, based on a comma, as a delimiter: scala> val pmap = p.map( line => line.split(",")) 7. 把array[String]RDD转换为行对象 val personData = pmap.map( p => Row(p(0),p(1),p(2).toInt)) 8.创建schema :参数名,参数类型,可否为空 val schema = StructType( Array(StructField("first_name",StringType,true), StructField("last_name",StringType,true), StructField("age",IntegerType,true) )) 9.把schema应用到RDD获得数据框 val personDF = sqlContext.createDataFrame(personData,sche ma) personDF.registerTempTable("person") val persons = sql("select * from person") persons.collect.foreach(println)
相关文章推荐
- Oracle 11G登陆提示ORA-28002:the password will expire within 7 days
- 一个sql很多个not like的简化语句
- sql根据年月分组
- oracle 表空间查看
- Memcached之代理服务magent(8)
- C# 中 SQLite 使用介绍
- MongoDB学习一
- hibernate.cfg.xml配置mysql数据库信息
- MSSQL Server 动态行转列
- Oracle GridControl 11gR1 for Linux 安装和配置指南
- oracle基本数据类型总结
- Oracle 删除表中记录 如何释放表及表空间大小
- mysql 关于对时间字段的查询
- mysql 获取刚插入行的id
- 如何做到在虚拟数据库和真实数据库之间自由切换?【低调赠送:QQ高仿版GG 4.4 最新源码】
- SQLite中的时间日期函数
- mysql定时任务
- 20 个数据库设计最佳实践
- SQL Server 合并复制遇到identity range check报错的解决
- MariaDB 10 Slave Crash-Safe需转为GTID复制模式