您的位置:首页 > 其它

Spark组件之Spark Streaming学习5--WindowsWordCount学习

2016-04-26 22:25 459 查看
更多代码请见:https://github.com/xubo245/SparkLearning

1.理解

WindowsWordCount是滑动窗口技术的应用,是统计多个窗口,在滑动。可以用于统计最近30秒或者最近一个小时的信息,单个batch还可以保留为1秒,然后每隔10秒或者半个小时的滑动进行统计



2.运行:

输入:

hadoop@Master:~/cloud/testByXubo/spark/Streaming$ nc -lk 9999
word
a
a
a
a
a
world
a
word
word
world
hello
a
a
a
a
a
hello
xubo
a
a
a
a
a
ab
b
b
b
bb
b
^C


输出:
hadoop@Master:~/cloud/testByXubo/spark/Streaming/windowsWordCount$ ./submitJob.sh
-------------------------------------------
Time: 1461680330000 ms
-------------------------------------------

-------------------------------------------
Time: 1461680340000 ms
-------------------------------------------
(word,1)
(world,1)
(a,5)

-------------------------------------------
Time: 1461680350000 ms
-------------------------------------------
(word,3)
(hello,1)
(world,2)
(a,6)

-------------------------------------------
Time: 1461680360000 ms
-------------------------------------------
(word,3)
(hello,2)
(world,2)
(a,11)

-------------------------------------------
Time: 1461680370000 ms
-------------------------------------------
(xubo,1)
(word,2)
(hello,2)
(world,1)
(a,6)

-------------------------------------------
Time: 1461680380000 ms
-------------------------------------------
(xubo,1)
(hello,1)
(a,5)

-------------------------------------------
Time: 1461680390000 ms
-------------------------------------------
(b,4)
(xubo,1)
(bb,1)
(a,5)
(ab,1)


4.源码:
package org.apache.spark.Streaming.learning

import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowsWordCount {
def main(args: Array[String]) {

val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)

//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")

// //获取数据
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))

//windows操作
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))

wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息