Spark组件之Spark Streaming学习5--WindowsWordCount学习
2016-04-26 22:25
459 查看
更多代码请见:https://github.com/xubo245/SparkLearning
1.理解
WindowsWordCount是滑动窗口技术的应用,是统计多个窗口,在滑动。可以用于统计最近30秒或者最近一个小时的信息,单个batch还可以保留为1秒,然后每隔10秒或者半个小时的滑动进行统计
2.运行:
输入:
hadoop@Master:~/cloud/testByXubo/spark/Streaming$ nc -lk 9999
word
a
a
a
a
a
world
a
word
word
world
hello
a
a
a
a
a
hello
xubo
a
a
a
a
a
ab
b
b
b
bb
b
^C
输出:
hadoop@Master:~/cloud/testByXubo/spark/Streaming/windowsWordCount$ ./submitJob.sh
-------------------------------------------
Time: 1461680330000 ms
-------------------------------------------
-------------------------------------------
Time: 1461680340000 ms
-------------------------------------------
(word,1)
(world,1)
(a,5)
-------------------------------------------
Time: 1461680350000 ms
-------------------------------------------
(word,3)
(hello,1)
(world,2)
(a,6)
-------------------------------------------
Time: 1461680360000 ms
-------------------------------------------
(word,3)
(hello,2)
(world,2)
(a,11)
-------------------------------------------
Time: 1461680370000 ms
-------------------------------------------
(xubo,1)
(word,2)
(hello,2)
(world,1)
(a,6)
-------------------------------------------
Time: 1461680380000 ms
-------------------------------------------
(xubo,1)
(hello,1)
(a,5)
-------------------------------------------
Time: 1461680390000 ms
-------------------------------------------
(b,4)
(xubo,1)
(bb,1)
(a,5)
(ab,1)
4.源码:
package org.apache.spark.Streaming.learning
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowsWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")
// //获取数据
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
//windows操作
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
1.理解
WindowsWordCount是滑动窗口技术的应用,是统计多个窗口,在滑动。可以用于统计最近30秒或者最近一个小时的信息,单个batch还可以保留为1秒,然后每隔10秒或者半个小时的滑动进行统计
2.运行:
输入:
hadoop@Master:~/cloud/testByXubo/spark/Streaming$ nc -lk 9999
word
a
a
a
a
a
world
a
word
word
world
hello
a
a
a
a
a
hello
xubo
a
a
a
a
a
ab
b
b
b
bb
b
^C
输出:
hadoop@Master:~/cloud/testByXubo/spark/Streaming/windowsWordCount$ ./submitJob.sh
-------------------------------------------
Time: 1461680330000 ms
-------------------------------------------
-------------------------------------------
Time: 1461680340000 ms
-------------------------------------------
(word,1)
(world,1)
(a,5)
-------------------------------------------
Time: 1461680350000 ms
-------------------------------------------
(word,3)
(hello,1)
(world,2)
(a,6)
-------------------------------------------
Time: 1461680360000 ms
-------------------------------------------
(word,3)
(hello,2)
(world,2)
(a,11)
-------------------------------------------
Time: 1461680370000 ms
-------------------------------------------
(xubo,1)
(word,2)
(hello,2)
(world,1)
(a,6)
-------------------------------------------
Time: 1461680380000 ms
-------------------------------------------
(xubo,1)
(hello,1)
(a,5)
-------------------------------------------
Time: 1461680390000 ms
-------------------------------------------
(b,4)
(xubo,1)
(bb,1)
(a,5)
(ab,1)
4.源码:
package org.apache.spark.Streaming.learning
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
object WindowsWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//创建StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint(".")
// //获取数据
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(","))
//windows操作
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))
//val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow(_+_, _-_,Seconds(args(2).toInt), Seconds(args(3).toInt))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
相关文章推荐
- Java IO与NIO的一些文件拷贝测试
- Stream、WshShell、WshUrlShortcut对象及Shell.Application的参数与使用
- Node.js中的流(Stream)介绍
- php中stream(流)的用法
- php错误提示failed to open stream: HTTP request failed!的完美解决方法
- C# Stream 和 byte[] 之间的转换
- 浅析Node.js 中 Stream API 的使用
- Nodejs Stream 数据流使用手册
- 利用stream实现一个简单的http下载器
- 浅谈PHP中Stream(流)
- php常用Stream函数集介绍
- pgpool-II+Hot_standby+Streaming replication环境搭建
- java8新特性(2)-Stream
- Scala: 一次命令式到函数式的重构
- PHP使用stream_context_create()模拟POST/GET请求的方法
- Nodejs Stream 数据流使用手册
- 标签 stream_context_create - 第1页 -- 简明现代魔法
- Oracle 10g R2 Stream环境的监控管理总结
- Oracle 10g R2 Stream复制环境的归档管理总结