SparkSQL(Spark-1.4.0)实战系列(二)——DataFrames进阶
2015-07-17 19:21
691 查看
本节主要内容如下
DataFrame与RDD的互操作实战不同数据源构建DataFrame实战
DataFrame与RDD的互操作实战
1 采用反映机制进行Schema类型推导(RDD到DataFrame的转换)SparkSQL支持RDD到DataFrame的自动转换,实现方法是通过Case类定义表的Schema,Spark会通过反射机制读取case class的参数名并将其配置成表的列名。
//导入该语句后,RDD将会被隐式转换成DataFrame import sqlContext.implicits._ //定义一个类为Person的Case Class作为Schema case class Person(name: String, age: Int) //读取文件并将数据Map成Person实例,然后转换为DataFrame,采用toDF()方法,本实例从HDFS上进行数据读取 val people = sc.textFile("/data/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() //将实例为peopler的DataFrame注册成表 people.registerTempTable("people") //采用SQLContext中的sql方法执行SQL语句 val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") //输出返回结果 teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
2 利用程序动态指定Schema
在某些应用场景下,我们可能并不能提前确定对应列的个数,因而case class无法进行定义,此时可以通过传入一个字符串来设置Schema信息。具体过程如下:
// 创建RDD val people = sc.textFile("/data/people.txt") //Schema字符串 val schemaString = "name age" // 导入Row import org.apache.spark.sql.Row; //导入Spark SQL数据类型 import org.apache.spark.sql.types.{StructType,StructField,StringType}; //利用schemaString动态生成Schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // 将people RDD转换成Rows val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // 创建DataFrame val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注册成表 peopleDataFrame.registerTempTable("people") //执行SQL语句. val results = sqlContext.sql("SELECT name FROM people") //打印输出 results.map(t => "Name: " + t(0)).collect().foreach(println)
通过不同数据源创建DataFrame
前面我们创建DataFrame时,读取的是HDFS中的txt类型数据,在SparkSQL中,它支持多种数据源,主要包括JSON、Parquet等。//读取json格式数据 val jsonFile= sqlContext.read.json("/data/people.json") //jsonFile注册成表 jsonFile.registerTempTable("peopleJson") val teenagers = sqlContext.sql("SELECT name FROM peopleJson WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
//保存为parquet格式数据 jsonFile.select("name", "age").write.format("parquet").save("/data/namesAndAges.parquet")
parquet文件目录结构如下图
//读取parquet格式数据
val parquetFile = sqlContext.read.parquet(“/data/namesAndAges.parquet”)
//parquetFile注册成表
parquetFile.registerTempTable(“parquetPerson”)
val teenagers = sqlContext.sql(“SELECT name FROM parquetPerson WHERE age >= 13 AND age <= 19”)
teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)
添加公众微信号,可以了解更多最新技术资讯
相关文章推荐
- 数据库建立索引前后的性能分析
- asp.net mvc 用Redis实现分布式集群共享Session。
- spark sql json处理代码修改记录
- Oracle分析函数
- mysql的MyISAM 和 InnoDB 的区别?优化MYSQL数据库的方法?
- MySQL 性能优化的最佳20多条经验分享
- 关于数据库性能优化小经验
- mysql 二级索引
- mysql大表设计
- mysql事务和锁InnoDB
- 关于NoSQL与SQL的区别
- database - 数据库设计/使用容易忽略的细节
- 理解MySQL--索引与优化(转载)
- C#运用实例.读取csv里面的词条,对每一个词条抓取百度百科相关资料,然后存取到数据库
- SQL语句优化原则
- jedis访问redis学习笔记
- jedis访问redis学习笔记
- Mongodb分片搭建(单实例)
- SQL查询语言练习
- 配置MySQL免安装版(zip)