您的位置:首页 > 数据库

spark sql

2015-11-04 16:46 344 查看
package com.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

object SpecifySchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("ReflectionSchema")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// Create an RDD
val people = sc.textFile("C:\\Users\\xiaoming.liang\\Desktop\\people.txt")
val schemaString = "name age"

val schema =  StructType(schemaString.split(" ").map { fieldName  => StructField(fieldName, StringType, true) })

// val rowRDD = people.map { x => x.split(",") }
val rowRDD = people.map(_.split(",")).map(p => Row(p(0),p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame=   sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
}
}


people.txt

xiaoming,23

xiaohong,21

zhangsan,20

lisi,10

wangwu,30
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: