sparkstreaming版本的单词统计
2016-06-04 18:18
253 查看
直接上代码、注释:
package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
/**
* spark streaming 版本的单词统计(通过监听端口)
* 1\在 hh15上执行 :yum install nc 安装工具nc
* 第一种本地测试的步骤:
* 2\在hh15上启动端口:#nc -lk 8888
* 3\在本地eclipse上run程序
* 4\在hh15上的dos界面中输入单词,输入的时间要在Durations.seconds(10)的范围内
* 5\在eclipse界面查看是否成功
* 第二种服务器上测试的步骤:
* 2\开启spark集群:(1)启动zookeeeper (2)在hh15上的spark的sbin下启动spark集群---》#sh start-all.sh
* 3\在hh15上开启端口:#nc -lk 8888
* 4\将WordCount.java类打成jar包(sparkStreamWordCount.jar),并且放到hh15上
* 5\使用standalone模式在hh15上执行:(一般是后台执行)
* # nohup sh
* 6\在刚才开启端口的窗口输入单词
*
*
* 测试结果:第一种方式测试通过,第二种方式未测试
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
//获取context 至少并行度是2
val sc=new StreamingContext(new SparkConf().setAppName("StreamCount").setMaster("local[2]"),Durations.seconds(8))
//获取端口输入的文本信息
val lines=sc.socketTextStream("192.168.142.115",
8888)
//压扁
val paris=lines.flatMap(x=>x.split(" "))
//map
val words=paris.map((_,1))
//reduce
val result=words.reduceByKey((x,y)=>x+y)
//打印前10个数
result.print()
//开启start
sc.start()
//等待
sc.awaitTermination()
//关闭资源
sc.stop()
}
}
package com.scala.my
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Durations
/**
* spark streaming 版本的单词统计(通过监听端口)
* 1\在 hh15上执行 :yum install nc 安装工具nc
* 第一种本地测试的步骤:
* 2\在hh15上启动端口:#nc -lk 8888
* 3\在本地eclipse上run程序
* 4\在hh15上的dos界面中输入单词,输入的时间要在Durations.seconds(10)的范围内
* 5\在eclipse界面查看是否成功
* 第二种服务器上测试的步骤:
* 2\开启spark集群:(1)启动zookeeeper (2)在hh15上的spark的sbin下启动spark集群---》#sh start-all.sh
* 3\在hh15上开启端口:#nc -lk 8888
* 4\将WordCount.java类打成jar包(sparkStreamWordCount.jar),并且放到hh15上
* 5\使用standalone模式在hh15上执行:(一般是后台执行)
* # nohup sh
* 6\在刚才开启端口的窗口输入单词
*
*
* 测试结果:第一种方式测试通过,第二种方式未测试
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
//获取context 至少并行度是2
val sc=new StreamingContext(new SparkConf().setAppName("StreamCount").setMaster("local[2]"),Durations.seconds(8))
//获取端口输入的文本信息
val lines=sc.socketTextStream("192.168.142.115",
8888)
//压扁
val paris=lines.flatMap(x=>x.split(" "))
//map
val words=paris.map((_,1))
//reduce
val result=words.reduceByKey((x,y)=>x+y)
//打印前10个数
result.print()
//开启start
sc.start()
//等待
sc.awaitTermination()
//关闭资源
sc.stop()
}
}
相关文章推荐
- 给裸接口加一道防护,避免恶意盗刷和爬取
- Unity3D 之武器系统冷却功能的实现方式
- 第二阶段冲刺——个人总结08
- MySQL修改编码设置及乱码问题
- 花生壳添加映射失败怎么办
- encodeURI(encodeURI(name)) ;文件上传
- In Gradle projects, always use http://schemas.android.com/apk/res-auto for custom attributes
- Linux 块设备驱动 (1)
- Android 动画中的Interpolator
- 自然语言处理相关工具(更新中)
- echarts.js:1136 Uncaught Error: Initialize failed: invalid dom.
- 简单的GRAM矩阵运算定义
- C++作业6
- WebApi接口安全认证——HTTP之摘要认证
- 每天一点数据结构之插入排序
- Sprint第二个冲刺(第三天)
- SICP 习题1. 42 compose
- kafka集群启动遇到LeaderNotAvailableException错误
- android:getSlotFromBufferLocked: unknown buffer: 0xf3d544c0
- android:getSlotFromBufferLocked: unknown buffer: 0xf3d544c0