您的位置:首页 > Web前端 > JavaScript

[2.1]Spark DataFrame操作(一)之读取并过滤json文件

2016-05-25 16:00 711 查看

参考

DT大数据梦工厂

spark官网

场景

DataFrame总论

IT界数据存储与操作发展的四大阶段

1、代码+文件系统

2、J2EE+DB (存在的问题:数据库不能进行分布式计算)

3、Hive

4、SparkSQL+Hive => SparkSQL+Hive+DataFrame -> SparkSQL+Hive +DataFrame+DataSet(DataSet目前处于实验阶段)

Hive+SparkSQL+DataFrame 黄金组合

1、Hive:负责廉价的数据仓库存储

2、SparkSQL:负责高速的计算

3、DataFrame:负责复杂的数据挖掘

DataFrame基本操作

读取Spark自带的people.json文件,并对其进行一些基本的sql操作,文件原始内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}


实验

java版

package cool.pengych.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
public class DataFrameOps {
public static void main(String[] args) {
/*
*  1、创建SQLContext
*/
SparkConf conf = new SparkConf().setAppName("My Frame Ops").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(jsc);
/*
* 2、创建DataFrame
*/
DataFrame df = sqlc.read().json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json");

/*
* 3、SQL
*/
// select * from table
df.show();
//desc table
df.printSchema();
// select name from table
df.select("name").show();
// select name,age+10 from table
df.select(df.col("name"),df.col("age").plus(10)).show();
// select * from table where age >10
df.filter(df.col("age").gt(10)).show();
df.groupBy(df.col("age")).count().show();
}
}


scala版

package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object DataFrameOps {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops")
val sqlContext = new SQLContext(new SparkContext(conf))

val df = sqlContext.read.json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")

df.show
df.printSchema
df.select("name").show
df.select("name", "age").show
df.select(df("name"),df("age")+10).show
df.filter(df("age")>10).show
}
}


执行结果

语句
df.filter(df("age")>10).show
的执行结果:

16/05/25 22:27:44 INFO DAGScheduler: Job 10 finished: show at DataFrameOps.scala:18, took 0.037776 s
+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+

16/05/25 22:27:44 INFO SparkContext: Invoking stop() from shutdown hook


总结

“A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood”

1、DataFrame是基于列的,包含了每条记录的Metadata信息-也就是说DataFrame的优化是基于列内部的优化,而不像RDD一样基于行进行优化。

2、DataFrame从形式上看最大的不同点是其天生是分布式的;你可以简单的认为spark中的DataFrame是一个分布式的Table。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: