您的位置:首页 > 运维架构

【Data Algorithms_Recipes for Scaling up with Hadoop and Spark】Chapter6 MovingAverage

2016-03-16 22:10 429 查看
:scala版算法

package com.bbw5.dataalgorithms.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD

/**
* sample.txt文件内容:
* GOOG,2004-11-04,184.70
* GOOG,2004-11-03,191.67
* GOOG,2004-11-02,194.87
* AAPL,2013-10-09,486.59
* AAPL,2013-10-08,480.94
* AAPL,2013-10-07,487.75
* AAPL,2013-10-04,483.03
* AAPL,2013-10-03,483.41
* IBM,2013-09-30,185.18
* IBM,2013-09-27,186.92
* IBM,2013-09-26,190.22
* IBM,2013-09-25,189.47
* GOOG,2013-07-19,896.60
* GOOG,2013-07-18,910.68
* GOOG,2013-07-17,918.55
* GOOG1,2013-07-19,896.60(测试partition生效,GOOG%2=0,GOOG1%2=1)
* GOOG1,2013-07-18,910.68
* GOOG1,2013-07-17,918.55
*/
object SparkMovingAverage {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("SparkMovingAverage")
val sc = new SparkContext(sparkConf)
val texts = sc.textFile("G:/temp/data/sample.txt", 2)
val bWin = sc.broadcast(3)

val items = texts.map { line =>
val a = line.split(",")
a(0) -> (a(1) -> a(2).toFloat)
}.groupByKey().mapValues { b =>
val items = new ArrayBuffer[((String, Float))]
val sort = b.toArray.sortBy(c => c._1)
for (i <- 0 until sort.length) {
//0->0,1->0,2->1,3->2,4->3
val from = if (i - bWin.value < 0) 0 else i - bWin.value + 1
val count = if (i - bWin.value < 0) i + 1 else bWin.value
items += sort(i)._1 -> sort.slice(from, i + 1).map(a => a._2).sum / count.toFloat
}
items.toList
}

items.foreach(println)
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: