Spark SQL程序实现RDD转换DataFrame(一)
2018-03-11 13:09
531 查看
通过反射推断Schema
在Spark SQL中有两种方式可以在DataFrame和RDD进行转换利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。
1、创建maven工程添加依赖
<properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.0.2</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.0.2</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> & 4000 lt;arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
2、代码实现
Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被反射读取并成为表的列名。这种RDD可以高效的转换为DataFrame并注册为表。package cn.cheng.sql import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * RDD转化成DataFrame:利用反射机制 */ //todo:定义一个样例类Person case class Person(id:Int,name:String,age:Int) extends Serializable object InferringSchema { def main(args: Array[String]): Unit = { //todo:1、构建sparkSession 指定appName和master的地址 val spark: SparkSession = SparkSession.builder() .appName("InferringSchema") .master("local[2]").getOrCreate() //todo:2、从sparkSession获取sparkContext对象 val sc: SparkContext = spark.sparkContext sc.setLogLevel("WARN")//设置日志输出级别 //todo:3、加载数据 val dataRDD: RDD[String] = sc.textFile("D:\\person.txt") //todo:4、切分每一行记录 val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" ")) //todo:5、将RDD与Person类关联 val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //todo:6、创建dataFrame,需要导入隐式转换 import spark.implicits._ val personDF: DataFrame = personRDD.toDF() //todo-------------------DSL语法操作 start-------------- //1、显示DataFrame的数据,默认显示20行 personDF.show() //2、显示DataFrame的schema信息 personDF.printSchema() //3、显示DataFrame记录数 println(personDF.count()) //4、显示DataFrame的所有字段 personDF.columns.foreach(println) //5、取出DataFrame的第一行记录 println(personDF.head()) //6、显示DataFrame中name字段的所有值 personDF.select("name").show() //7、过滤出DataFrame中年龄大于30的记录 personDF.filter($"age" > 30).show() //8、统计DataFrame中年龄大于30的人数 println(personDF.filter($"age">30).count()) //9、统计DataFrame中按照年龄进行分组,求每个组的人数 personDF.groupBy("age").count().show() //todo-------------------DSL语法操作 end------------- //todo--------------------SQL操作风格 start----------- //todo:将DataFrame注册成表 personDF.createOrReplaceTempView("t_person") //todo:传入sql语句,进行操作 spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='zhangsan'").show() spark.sql("select * from t_person order by age desc").show() //todo--------------------SQL操作风格 end------------- sc.stop() } }
喜欢就点赞评论+关注吧
感谢阅读,希望能帮助到大家,谢谢大家的支持!
相关文章推荐
- Spark SQL程序实现RDD转换DataFrame(二)
- 写一个在程序中创建dts包,实现数据用dts导入到sql的类。
- 利用SqlHelper.cs实现Web程序与数据库的连接
- 利用SqlHelper.cs实现Web程序与数据库的连接
- SparkSQL ThriftServer服务的使用和程序中JDBC的连接
- sparkSQL的整体实现框架
- sparkSQL里 sql语句,dataframe,Thrift Server JDBC都可以实现对数据的查询,过滤等操作, 哪这3种情况分别是什么情况下使用
- sparkSQL结合hive的入门程序
- spark-sql部署实现与Hive交互
- 利用SqlHelper.cs实现Web程序与数据库的连接
- 第七篇:Spark SQL 源码分析之Physical Plan 到 RDD的具体实现
- Spark 程序 WordCount实现 Scala、Python
- 《Spark商业案例与性能调优实战100课》第9课:商业案例之通过Spark SQL 下两种不同方式实现口碑最佳和最热门电影比较
- 实现CDH支持Spark SQL功能
- SparkSQL ThriftServer服务的使用和程序中JDBC的连接
- Spark SQL中Join常用的几种实现
- 利用SqlHelper.cs实现Web程序对数据库的增、删、改等操作
- scala程序写Spark程序实现黑名单过滤
- 用程序实现SQL的表导出 推荐
- 利用SqlHelper.cs实现Web程序与数据库的连接