您的位置:首页 > 其它

SparkStreaming的一个入门例子程序

2018-01-23 16:48 513 查看
package com.lyzx.day31

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}

class T1 {

/**
* 关于SparkStreaming的一个demo
* @param ssc
*/
def f1(ssc:StreamingContext): Unit ={
//返回值类型是ReceiverInputDStream
//192.168.29.160是虚拟机(linux)的IP地址
//使用nc -lk 9999命令输入数据,当输入nc -lk 9999时就表示在192.168.29.160这台机器上的9999端口启动了一个socket server

//监听192.168.29.160的9999端口
val line = ssc.socketTextStream("192.168.29.160",9999)

//对接受到的数据做worldCount并打印
line.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.print()

//开启流式处理
ssc.start()
//等待
ssc.awaitTermination()
}
}

object T1{
def main(args: Array[String]): Unit = {
//这儿的setMaster方法里要写local
N>=2,原因是至少启动2个线程,一个线程是receiver Task即从外部的
//数据源接受数据,剩下的至少一个线程用来处理receiver Task接受的数据
val conf = new SparkConf().setAppName("day31").setMaster("local[2]")

//创建Streaming上下文
val ssc = new StreamingContext(conf,Seconds(5))
val t = new T1
t.f1(ssc)
ssc.stop()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: