您的位置:首页 > 大数据

RDD转换成DataFrame的两种方法

2017-11-14 15:40 471 查看

1.根据反射推断schema

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object createDF {
// 方法1 根据包括case class数据的RDD转换成DataFrame
// case class定义表的schema,case class的属性会被读取并且成为列的名字
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sparkSQLTest").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//先将RDD转化成case class 数据类型,然后再通过toDF()方法隐式转换成DataFrame
import sqlContext.implicits._
val people = sc.textFile("your file path").map(_.split(",")).map(p =>Person(p(0), p(1).trim.toInt)).toDF()
//注册成一个表
people.registerTempTable("peopleTable")
//然后可以对表进行各种操作,比如打印出13到19岁青少年的姓名
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
}
}


2.使用Programmatically的方式指定Schema

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
import org.apache.spark.sql.Row

object createDF {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sparkSQLTest").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

//step1: 从原来的 RDD 创建一个行的 RDD
val peopleRow = sc.textFile("your file path").map(_.split(",")).map(p => Row(p(0), p(1)))

//step2: 创建由一个 StructType 表示的模式, 并且与第一步创建的 RDD 的行结构相匹配
//构造schema用到了两个类StructType和StructFile,其中StructFile类的三个参数分别是(字段名称,类型,数据是否可以用null填充)
val schema = StructType(Array(StructField("name", StringType, true), StructField("age", IntegerType, true)))

//step3.在行 RDD 上通过 createDataFrame 方法应用模式
val people = sqlContext.createDataFrame(peopleRow, schema)
people.registerTempTable("peopleTable")
//然后可以对表进行各种操作,比如打印出13到19岁青少年的姓名
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 大数据 sparkSQL