您的位置:首页 > 其它

01_spark1.3_RDD的开发

2015-04-15 11:51 155 查看
原创博客:http://blog.csdn.net/codemosi。辛辛苦苦记录,希望转发不要删这行。

上个spark系列博客是0.9版本,也没带录视频,开始录制一个1.3的spark系列,以备忘,好记性不如烂笔头。本系列还是集中在开发部分。概念和集群运维不会涉及到。主要涉及spark RDD,spark SQL,spark Streaming,graphX,mllib.的开发,和API接口说明。

需求:

SELECT tel,SUM(money) FROM u GROUP BY tel WHERE tel = ?

使用spark 的RDD 的api实现这个数据分析的程序如何开发,作为第一篇。尽可能的的把程序分开。map等高阶函数的函数值会先分离出来,程序略显傻逼。但是 较为通熟易懂。

涉及到的API

1:RDD spark编程的对象,开发的时候看成Map或者List就好。实质上是丰富的子类和隐式转换的抽象类。

2:SparkContext spark核心,带了丰富的RDD的隐式转换。通过他实现方法链的连续调用,rdd促发隐式调用

3:sparkConf 配置对象

4:map 高阶函数1:传入一个函数,对rdd内每个值调用,并返回调用该函数的新的rdd

5:filter 高阶函数2:传入一个函数,对rdd内每个值调用,并返回调用该函数后,返回结果为true,的新的rdd

6:reduceByKey 高阶函数3:传入一个(key,value)对函数,对相同key的rdd内连续的2个值调用,并返回调用该函数后的rdd

7:take 取出Rdd的前N个对象,是action级别的RDD操作,能真正促发spark开始计算。

-----------------配套视频----------------------------------------

录制中。

-----------------配套代码----------------------------------------

package spark.liguozhong._01

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

object _01SparkContext {

def main(args: Array[String]) {

//01准备sparkContext上下文

val conf = new SparkConf

conf.setMaster("local").setAppName("_01SparkContext ").set("", "")

val sc = new SparkContext(conf)

//02 准备数据

val data = Array(

"13559 5",

"15888 9",

"13355 1",

"13559 2",

"15888 7",

"18988 117")

//03scala数据类型转换成 RDD

val rdd = sc.parallelize(data, 1);

//04开始spark transform操作

rdd.map(mapFunction)

.filter(filterFunction)

.reduceByKey(reduceFunction)

.take(10)

.foreach(println)

}

//--------------------------------------------

val mapFunction = (x: String) => {

val megs = x.split(" ")

(megs(0).toInt, megs(1).toInt)

}

val filterFunction = (meg: (Int, Int)) => {

meg._1 != 18988

}

val reduceFunction = (x: Int, y: Int) => {

x + y

}

}

原创博客:http://blog.csdn.net/codemosi。辛辛苦苦记录,希望转发不要删这行。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: