您的位置:首页 > 大数据

大数据学习笔记:YARN框架分析之事件的分发与状态机

2017-08-10 19:58 761 查看
(这部分有点难,因为版本问题还是其他什么问题,反正我这边eclipse的Hadoop插件一直没法用,所以没法通过断点调试的方法来一步步具体分析其中的步骤。看源码看的头痛死了,好多方法还有类名根本记不住。)

之前说过,yarn框架下的MRAppMaster是一个状态机,它的状态改变是由事件来驱动的。

AsyncDispatcher

Hadoop中有一个核心类:AsyncDispatcher类,一个异步事件分发器,底层总管道

在eclipse中查看一下它的源码,在大纲视图中可以看到它有一个(阻塞的)事件队列和处理事件的线程:

public class AsyncDispatcher extends AbstractService implements Dispatcher


private final BlockingQueue<Event> eventQueue;
private Thread eventHandlingThread;


从队列中取出事件

处理线程是被createThread()方法创建的:

此方法主体是一个while循环不断地从事件队列中获取事件然后通过调用dispatch()方法把事件分发出去,其部分代码如下:

Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
dispatch(event);
}
而dispatch()方法在获得事件之后,首先确定事件的类型,然后根据事件类型把事件交给相应的Handler,部分代码如下:

EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
}
eventDispatchers的定义:

protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;


往队列中添加事件

既然有取出事件的方法,肯定也有增添事件的方法。类GenericEventHandler就实现了EventHandler<Event>接口,其下有一个handle()方法就是负责往事件队列中添加事件的,部分代码如下:

eventQueue.put(event);


ResourceManager中的Dispatcher

而在ResourceManager类中也有一个名为Dispatcher的类,但它实现的却是EventHandler型接口:

public static final class ApplicationEventDispatcher implements EventHandler<RMAppEvent>
并且它对事件的处理是交给RMApp下的handle()方法处理的:

ApplicationId appID = event.getApplicationId();
      RMApp rmApp = this.rmContext.getRMApps().get(appID);
      if (rmApp != null) {
        try {
          rmApp.handle(event);
        }
可见ResourceManager类中的Dispatcher并不是正真的分发器,它只是一个二次分发,正真的事件分发器只有AsyncDispatcher类。

但是又有一个问题,为什么需要二次分发?而不直接交给AsyncDispatcher处理?

因为ApplicationEventDispatcher处理的事件是无类型的(不判断类型),而RMApp处理的事件是有类型的,具体的事件分发到哪由ID决定(由代码看出)。好吧,说实话我还是没搞懂这里为什么要多搞一层(存疑)。

RMApp中事件的状态

EventHandler处理的事件类型为<RMAppEvent>,而RMAppEvent又继承于抽象事件:

public class RMAppEvent extends AbstractEvent<RMAppEventType>
可以发现RMAppEventType是一个枚举变量,列举了事件所有的状态:



Application的状态

进入到ResourceManager的WEB页面,在页面左边有application的状态:



进入到RMAppImpl类的handle()方法中,查看状态的具体变换过程:



注意以下三块代码:

this.writeLock.lock();
try
{
/* keep the master in sync with the state machine */
this.stateMachine.doTransition(event.getType(), event);
}
finally
{
this.writeLock.unlock();
}
状态机在做状态变换时是一个互斥操作,有一个加锁解锁的过程。任务的状态显示(web页面显示)只有在解锁之后才会有变化。

App状态的变量类型为RMAppState,其下包括了这几种状态:

NEW,NEW_SAVING,SUBMITTED,ACCEPTED,RUNNING,FINAL_SAVING,FINISHING,FINISHED,FAILED,KILLING,KILLED

可以看到有几种状态是不在WEB界面显示的。

ResourceManager中还有一些其他的事件分发器:



过程

RM收到submitApp()后,将请求转给YarnScheduler,由scheduler分配container。YarnScheduler在源码中是一个接口,它的实现类为CapacityScheduler:



RM在新建的container中启动MRAppMaster,MRAppMaster负责跟踪作业进度,其管理权由RM交给NM。MRAppMaster需要去检索数据的分割信息(input split),为每个split创建map任务与一定数量的reduce任务,并分配jobid。MRAppMaster会判断job的规模,决定任务是本地运行(uber)还是集群运行。最后MRAppMaster调用setupJob()方法,创建输出目录与临时目录。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: