您的位置:首页 > 运维架构

IMF SparkStreaming 自定义EventLoop 从入门到放弃

2016-05-15 21:31 609 查看
1、自定义一个IMFEventLoop

2、自定义一个IMFEventLoopTest测试类

运行结果

com.dt.spark.sparkstreaming.IMFEventLoopTest

=====IMF EventLoop from learn to give up !=========

IMFEventLoop的start调用onstart,unit

IMFEventLoop的构造函数启用了一个线程,eventThread.start()开始运行

IMF EventLoop :1

IMFEventLoop取得了event1

IMF EventLoop :2

IMFEventLoop的onReceive(event)1

IMF EventLoop :3

IMF EventLoop :4

IMF 处理启动的事件 1,例如spark.streaming.scheduler的JobScheduler.scala的JobStarted(job, startTime) => handleJobStart(job, startTime)

IMF EventLoop :5

IMF onReceive:1

IMF EventLoop :6

IMFEventLoop取得了event2

IMF EventLoop :7

IMFEventLoop的onReceive(event)2

IMF EventLoop :8

IMF 处理完成的事件 2 spark.streaming.scheduler的JobScheduler.scala的case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)

IMF onReceive:2

IMFEventLoop取得了event3

IMFEventLoop的onReceive(event)3

IMF EventLoop :9

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :10

IMF onReceive:3

IMF EventLoop :11

IMFEventLoop取得了event4

IMF EventLoop :12

IMFEventLoop的onReceive(event)4

IMF EventLoop :13

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :14

IMF onReceive:4

IMF EventLoop :15

IMFEventLoop取得了event5

IMF EventLoop :16

IMFEventLoop的onReceive(event)5

IMF EventLoop :17

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :18

IMF onReceive:5

IMF EventLoop :19

IMF EventLoop :20

IMF EventLoop :21

IMFEventLoop取得了event6

IMF EventLoop :22

IMFEventLoop的onReceive(event)6

IMF EventLoop :23

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :24

IMF onReceive:6

IMF EventLoop :25

IMFEventLoop取得了event7

IMF EventLoop :26

IMFEventLoop的onReceive(event)7

IMF EventLoop :27

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :28

IMF onReceive:7

IMF EventLoop :29

IMFEventLoop取得了event8

IMF EventLoop :30

IMFEventLoop的onReceive(event)8

IMF EventLoop :31

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :32

IMF onReceive:8

IMF EventLoop :33

IMFEventLoop取得了event9

IMF EventLoop :34

IMFEventLoop的onReceive(event)9

IMF EventLoop :35

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF onReceive:9

IMFEventLoop取得了event10

IMFEventLoop的onReceive(event)10

IMF EventLoop :36

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :37

IMF onReceive:10

IMF EventLoop :38

IMFEventLoop取得了event11

IMF EventLoop :39

IMFEventLoop的onReceive(event)11

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :40

IMF onReceive:11

IMF EventLoop :41

IMFEventLoop取得了event12

IMF EventLoop :42

IMFEventLoop的onReceive(event)12

IMF EventLoop :43

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :44

IMF onReceive:12

IMF EventLoop :45

IMFEventLoop取得了event13

IMF EventLoop :46

IMFEventLoop的onReceive(event)13

IMF EventLoop :47

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :48

IMF onReceive:13

IMF EventLoop :49

IMF EventLoop :50

IMFEventLoop取得了event14

IMF EventLoop :51

IMFEventLoop的onReceive(event)14

IMF EventLoop :52

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :53

IMF onReceive:14

IMF EventLoop :54

IMFEventLoop取得了event15

IMF EventLoop :55

IMFEventLoop的onReceive(event)15

IMF EventLoop :56

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :57

IMF onReceive:15

IMF EventLoop :58

IMFEventLoop取得了event16

IMF EventLoop :59

IMFEventLoop的onReceive(event)16

IMF EventLoop :60

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :61

IMF onReceive:16

IMF EventLoop :62

IMFEventLoop取得了event17

IMF EventLoop :63

IMFEventLoop的onReceive(event)17

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF onReceive:17

IMFEventLoop取得了event18

IMFEventLoop的onReceive(event)18

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :64

IMF onReceive:18

IMF EventLoop :65

IMFEventLoop取得了event19

IMF EventLoop :66

IMFEventLoop的onReceive(event)19

IMF EventLoop :67

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :68

IMF EventLoop :69

IMF EventLoop :70

IMF EventLoop :71

IMF onReceive:19

IMF EventLoop :72

IMFEventLoop取得了event20

IMF EventLoop :73

IMFEventLoop的onReceive(event)20

IMF EventLoop :74

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :75

IMF onReceive:20

IMFEventLoop取得了event21

IMFEventLoop的onReceive(event)21

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF onReceive:21

IMFEventLoop取得了event22

IMF EventLoop :76

IMFEventLoop的onReceive(event)22

IMF EventLoop :77

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :78

IMF onReceive:22

IMF EventLoop :79

IMFEventLoop取得了event23

IMF EventLoop :80

IMFEventLoop的onReceive(event)23

IMF EventLoop :81

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :82

IMF onReceive:23

IMF EventLoop :83

IMFEventLoop取得了event24

IMF EventLoop :84

IMFEventLoop的onReceive(event)24

IMF EventLoop :85

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :86

IMF onReceive:24

IMF EventLoop :87

IMFEventLoop取得了event25

IMF EventLoop :88

IMFEventLoop的onReceive(event)25

IMF EventLoop :89

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :90

IMF onReceive:25

IMF EventLoop :91

IMFEventLoop取得了event26

IMF EventLoop :92

IMFEventLoop的onReceive(event)26

IMF EventLoop :93

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF EventLoop :94

IMF onReceive:26

IMF EventLoop :95

IMFEventLoop取得了event27

IMF EventLoop :96

IMFEventLoop的onReceive(event)27

IMF EventLoop :97

IMF EventLoop :98

IMF EventLoop :99

IMF EventLoop :100

IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e)

IMF onReceive:27

Process finished with exit code 0



package com.dt.spark.sparkstreaming

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{BlockingQueue, LinkedBlockingDeque}

import org.apache.spark.Logging

import scala.util.control.NonFatal

/**
* An event loop to receive events from the caller and process all events in the event thread. It
* will start an exclusive event thread to process all events.
*
* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can
* handle events in time to avoid the potential OOM.
*/
private[com]abstract class IMFEventLoop[E](name: String) extends Logging {

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

private val stopped = new AtomicBoolean(false)

private val eventThread = new Thread(name) {
setDaemon(true)

override def run(): Unit = {
try {
while (!stopped.get) {
val event = eventQueue.take()
println("IMFEventLoop取得了event" + event )
try {
println("IMFEventLoop的onReceive(event)" + event )
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}

}

def start(): Unit = {
if (stopped.get) {
throw new IllegalStateException(name + " has already been stopped")
}
// Call onStart before starting the event thread to make sure it happens before onReceive
//println("IMFEventLoop的start开始调用onstart")
onStart()
println("IMFEventLoop的构造函数启用了一个线程,eventThread.start()开始运行")
eventThread.start()
}

def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
eventThread.interrupt()
var onStopCalled = false
      try {
eventThread.join()
// Call onStop after the event thread exits to make sure onReceive happens before onStop
onStopCalled = true
onStop()
} catch {
case ie: InterruptedException =>
Thread.currentThread().interrupt()
if (!onStopCalled) {
// ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
// it's already called.
onStop()
}
}
} else {
// Keep quiet to allow calling `stop` multiple times.
}
}

/**
* Put the event into the event queue. The event thread will process it later.
*/
def post(event: E): Unit = {
eventQueue.put(event)
}

/**
* Return if the event thread has already been started but not yet stopped.
*/
def isActive: Boolean = eventThread.isAlive

/**
* Invoked when `start()` is called but before the event thread starts.
*/
protected def onStart(): Unit = {

println("IMFEventLoop的start调用onstart,unit")
}

/**
* Invoked when `stop()` is called and the event thread exits.
*/
protected def onStop(): Unit = {}

/**
* Invoked in the event thread when polling events from the event queue.
*
* Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked
* and cannot process events in time. If you want to call some blocking actions, run them in
* another thread.
*/
protected def onReceive(event: E): Unit

/**
* Invoked if `onReceive` throws any non fatal error. Any non fatal error thrown from `onError`
* will be ignored.
*/
protected def onError(e: Throwable): Unit

}




package com.dt.spark.sparkstreaming

import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

/**
* Created by admin on 2016/5/15.
*/
object IMFEventLoopTest {

def IMFprocessEvent(eventint: Int) {
eventint match {
case 1 => println("IMF 处理启动的事件 1,例如spark.streaming.scheduler的JobScheduler.scala的JobStarted(job, startTime) => handleJobStart(job, startTime) ")
case 2 => println("IMF 处理完成的事件 2 spark.streaming.scheduler的JobScheduler.scala的case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)")
case _ => println("IMF 处理其他的事件 _spark.streaming.scheduler的ErrorReported(m, e) => handleError(m, e) ")
}

}
def main(args: Array[String]) {
println("=====IMF EventLoop from learn to give up !=========")

//test("IMFEventLoop")
val buffer = new ConcurrentLinkedQueue[Int]
val eventLoop = new IMFEventLoop[Int]("IMFtest") {

override def onReceive(event: Int): Unit = {
buffer.add(event)
IMFprocessEvent(event)
println("IMF onReceive:" + event)

}

override def onError(e: Throwable): Unit = {}
}
eventLoop.start()
for (i <- 1 to 100 ) {
eventLoop.post(i)
println("IMF EventLoop :" + i )
}
eventLoop.stop()

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: