spark sql
2015-11-04 16:46
344 查看
package com.spark.sql import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row object SpecifySchema { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("ReflectionSchema") .setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // Create an RDD val people = sc.textFile("C:\\Users\\xiaoming.liang\\Desktop\\people.txt") val schemaString = "name age" val schema = StructType(schemaString.split(" ").map { fieldName => StructField(fieldName, StringType, true) }) // val rowRDD = people.map { x => x.split(",") } val rowRDD = people.map(_.split(",")).map(p => Row(p(0),p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame= sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println) } }
people.txt
xiaoming,23
xiaohong,21
zhangsan,20
lisi,10
wangwu,30
相关文章推荐
- PostgreSQL表的行数统计
- 【转】iOS开发—SQLite的简单使用
- MongoDB 查询非空数组
- Ibatis 动态条件SQL语句
- 不同数据库自增字段的创建实现举例
- 另一套Oracle SQL练习题,更新参考答案
- SQLite数据库的使用总结
- MyBatis SQL映射
- oracle向已有表增加字段、字段设置默认值、修改表字varchar类型的长度
- oracle谓词推进测试
- MySQL无法启动,报错2002,无法连接
- redis集群实战
- mysql表字符编码问题
- MySQL编译安装(单实例)
- linux安装cx_Oracle并连接oracle
- 打开外部数据库
- Mysql 5.7.9源代码安装
- 关于数据库的一些函数的使用
- Redis 缓存集群 (一)Redis3.0.5 集群搭建
- Redis 缓存集群 (一)Redis3.0.5 集群搭建