您的位置:首页 > 数据库

Spark SQL程序实现RDD转换DataFrame(二)

2018-03-11 13:15 302 查看

通过StructType直接指定Schema

1、当case class不能提前定义时,可以通过以下三步创建DataFrame

1、将RDD转为包含row对象的RDD

1、基于structType类型创建schema,与第一步创建的RDD相匹配

2、通过sparkSession的createDataFrame方法对第一步的RDD应用

schema创建DataFrame

2、代码实现

maven依赖同Spark SQL程序实现RDD转换DataFrame(一)

package cn.cheng.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
* RDD转换成DataFrame:通过指定schema构建DataFrame
*/
object SparkSqlSchema {
def main(args: Array[String]): Unit = {
//todo:1、创建SparkSession,指定appName和master
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlSchema")
.master("local[2]")
.getOrCreate()
//todo:2、获取sparkContext对象
val sc: SparkContext = spark.sparkContext
//todo:3、加载数据
val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")
//todo:4、切分每一行
val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
//todo:5、加载数据到Row对象中
val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
//todo:6、创建schema
val schema:StructType= StructType(Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("age", IntegerType, false)
))

//todo:7、利用personRDD与schema创建DataFrame
val personDF: DataFrame = spark.createDataFrame(personRDD,schema)

//todo:8、DSL操作显示DataFrame的数据结果
personDF.show()

//todo:9、将DataFrame注册成表
personDF.createOrReplaceTempView("t_person")

//todo:10、sql语句操作
spark.sql("select * from t_person").show()

spark.sql("select count(*) from t_person").show()

sc.stop()
}
}


喜欢就点赞评论+关注吧



感谢阅读,希望能帮助到大家,谢谢大家的支持!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark sql