您的位置:首页 > 大数据

大数据---spark系列--DateFrame

2015-09-25 10:42 411 查看
做了几年的大数据开发,现在有了写博客的想法。目前对spark的研究是最多的,来个开篇作吧。

自从spark1.3开始有了Dataframe后,自此数据分析的领域有多了一个神器。开始的时候我也好奇它是什么东东。于此我们先从案例看起

--

很多时候我们是看官方案例开始的

给个链接吧
http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes
创建dataframe

1、直接转化

spark 把一个文本文件转换成dataframe目前支持的格式有json,parqurt等几个如果想支持特定的文本文档,可以自己写,spark package 里面有比较多的资料

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Show the content of the DataFrame
df.show()


2、通过类构造
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()


3、通过schema和Row

val schema =

StructType(

schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

这是三种方式

很多人好奇为什么要把rdd转化成dataframe来操作,到spark1.5版本我们发现里面的api操作越来越多,之前我看多python的pandas里面也是dataframe的概念,推荐一本书《利用python进行数据分析》,转化成df后我们可以更加方便的操作api(后面我会专门做些spark1.5新api介绍),当然我们也可以registerTempTable后直接进行sql操作,而且性能是比rdd更加快的。见下图



不仅仅在数据分析,我们可以在最近的ML里面看到越来越多的算法加入了。Naive Bayes, K-means,Regression等里面的算法基本看不到rdd的身形。都是dataframe来做的(后面我会把自己写的一些ML案例加进来)期待吧。。。

给个demo大家自己联系吧。。支持后面写的什么样的东东大家留言,评论吧。。

val custs = Seq(
Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
//UDF操作
val myFunc = udf { (x: Double) => x + 1 }
val customerDF = sc.parallelize(custs, 4).toDF()
val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")
val mappedCols = cols.map(c => if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)
//聚合操作
customerDF.groupBy("state").agg($"state", stddevFunc($"discount")).show()
val datas= customerDF.groupBy("state").count()
}
//自定义函数
def stddevFunc(c: Column): Column =
sqrt(avg(c * c) - (avg(c) * avg(c)))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: