您的位置:首页 > 其它

第102讲: 动手实战Spark Streaming自定义Receiver并进行调试和测试

2016-05-22 13:41 686 查看
有兴趣想学习国内整套Spark+Spark Streaming+Machine learning顶级课程的,可加我qq
471186150。共享视频,性价比超高!

1:SparkStreaming虽然说已经支持了很多不同类型的数据来源。但是有时候可能我们的一些数据来源非常特殊 ,不是它天然默认支持的,这时候就要自定义Receiver。而自定义Receiver,一般都是基于网络的方式。因为你传数据的话,一般是从另外一个网络端口传过来,至于传的协议是另外一码事。

2:从本质上来说,SparkStreaming中的所有Receiver,都是自定义的Receiver。所以你要想自定义一个Receiver,最最简单的方式,你就是看下已有的Receiver怎么去实现。

具体步骤:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }//开启线程调receiver()方法
}.start()
}

def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}

/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port,Receiver的时候就连上Socket
socket = new Socket(host, port)

// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()//每读一行,存一次,一直循环
}
reader.close()
socket.close()

// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")//先stop,然后再start()
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}

上面就已经自定义完了一个Receiver,下面就new出它的对象,传进去。因为返回的是Dstream,以前对Dstream怎么操作,继续怎么操作,这里先从flatMap开始。

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: