您的位置:首页 > 移动开发

[Java代码] [YARN] MRAppMaster心跳原理

2016-03-18 16:04 274 查看
hackshell

最近集群遇到一个问题,就是集群在跑任务的时候,AM会超时10min而被KILL,但任务重跑则成功,问题是随机的出现的,所以初步怀疑是因为AM心跳汇报出现问题或则RM因为繁忙hang住,AM因为某些机制导致等待10min不汇报心跳,所以我们还是先了解,AM是如何向RM汇报心跳的。

在MRAppMaster中,ContainerAllocatorRouter负责向RM申请资源(发送心跳)



RMContainerAllocator其最终父类是RMCommunicator,它实现了RMHeartbeatHandler接口

public interface RMHeartbeatHandler {

long getLastHeartbeatTime(); // 获取上一次心跳的时间

void runOnNextHeartbeat(Runnable callback); // 回调注册到callback队列的callback函数

}http://www.nvzi91.cn/gongjingai/29953.html

复制代码

每一次心跳回来,都会执行一次注册在heartbeatCallbacks中的回调函数:

allocatorThread = new Thread(new Runnable() {

@Override

public void run() {

while (!stopped.get() && !Thread.currentThread().isInterrupted()) {

......http://m.nvzi91.cn/qiaokelinanzhong/29363.html

heartbeat();

lastHeartbeatTime = context.getClock().getTime();// 记录上一次心跳时间

executeHeartbeatCallbacks(); // 执行回调函数

....

});http://www.nvzi91.cn/penqiangyan/29954.html

复制代码

RMCommunicator类中:

private void executeHeartbeatCallbacks() {

Runnable callback = null;

while ((callback = heartbeatCallbacks.poll()) != null) {

callback.run();

}http://m.nvzi91.cn/penqiangyan/29362.html

}www.nvzi91.cn
http://www.nvzi91.cn/jingqiqianzonghezheng/29955.html
复制代码

在RMCommunicator启动时,首先会向RM注册,把自己的host和port告诉RM,然后在启动一条线程(startAllocatorThread)定期的调用RMContainerAllocator中实现的heartbeat方法(向RM申请资源,定期汇报信息,告诉RM自己还活着)。

AM初始化同时也会初始化RMCommunicator:

protected void serviceStart() throws Exception {

scheduler= createSchedulerProxy(); // 获取RM的代理

register(); // 注册

startAllocatorThread(); // 心跳线程

....

}http://m.nvzi91.cn/renliuzhunbei/031829361.html

复制代码

AM的ContainerAllocatorRouter事件处理流程如下图:



注册流程:

调用RMCommunicator远程调用ApplicationMasterService的registerApplicationMaster方法,设置维护responseId,然后把它加入AMLivelinessMonitor中,并使用map记录时间,用来监控AM是否因为长时间没有心跳而超时,如果AM长时间没有心跳信息更新,RM就会通知NodeManager把AM移除。

心跳线程:

在发送心跳的过程中,即也是获取资源的过程

@Override

protected synchronized void heartbeat() throws Exception {

scheduleStats.updateAndLogIfChanged("Before Scheduling: ");

List<Container> allocatedContainers = getResources();// 重要的方法

if (allocatedContainers.size() > 0) {

scheduledRequests.assign(allocatedContainers);

}http://www.nvzi91.cn/tongjing/29956.html

......

}http://www.kmrlyy.com/fkjbcs/33474.html

复制代码

获取资源的过程:

private List<Container> getResources() throws Exception {

...

response = makeRemoteRequest(); // 和RM进行交互

...

// 优先处理RM发送过来的命令

if (response.getAMCommand() != null) {

switch(response.getAMCommand()) {

case AM_RESYNC:http://www.kmrlyy.com/gongjingmilan/33473.html

case AM_SHUTDOWN:

eventHandler.handle(new JobEvent(this.getJob().getID(),

JobEventType.JOB_AM_REBOOT));

throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +

this.getContext().getApplicationID());

default:www.kmrlyy.com

....

}

// 等等一系列处理

}http://www.nvzi91.cn/penqiangyan/29957.html

}

复制代码

构建请求:

protected AllocateResponse makeRemoteRequest() throws IOException {

AllocateRequest allocateRequest =

AllocateRequest.newInstance(lastResponseID,

super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),

new ArrayList<ContainerId>(release), blacklistRequest);

AllocateResponse allocateResponse;

allocateResponse = scheduler.allocate(allocateRequest); // RPC调用ApplicationMasterService的allocate方法

.....http://www.kmrlyy.com/nvxingbuyun/33472.html

}

复制代码

每一次心跳的调用都会刷新AMLivelinessMonitor的时间,代表AM还活着

而且我们通过代码可以看出,资源请求被封装为一个ask,即一个ResourceRequest的ArrayList的资源列表 例如:

priority:20 host:host9 capability:<memory:2048, vCores:1>

priority:20 host:host2 capability:<memory:2048, vCores:1>

priority:20 host:host10 capability:<memory:2048, vCores:1>

priority:20 host:/rack/rack3203 capability:<memory:2048, vCores:1>

priority:20 host:/rack/rack3202 capability:<memory:2048, vCores:1>

prhttp://www.kmrlyy.com/waiyinyan/33470.htmliority:20 host:* capability:<memory:2048, vCores:1>

m.nvzi91.cn

复制代码

然而,ask是如何被构造的呢?

RMContainerAllocator中的addMap,addReduce,assign方法中对ask的数据内容进行了修改

addContainerReq --> addResourceRequest --> addResourceRequestToAsk;

复制代码

通过在代码自己添加日志可以看出,资源会被分为local,rack,和any级别去申请资源

最终变为一个ask list发送到RM上:

ask Capability:<memory:2048, vCores:1> ResourceName:* NumContainers:384 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3201 NumContainers:227 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3202 NumContainers:231 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3203 NumContainers:152 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:/rack/rack3204 NumContainers:158 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:host1 NumContainers:46 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:host5 NumContainers:52 Priority:20 RelaxLocality:true

ask Capability:<memory:2048, vCores:1> ResourceName:host6 NumContainers:38 Priority:20 RelaxLocality:true
http://www.kmrlyy.com/baidaiyichang/33471.html
复制代码

类似日志为:

getResources() for application_1438330253091_0004: ask=29 release= 0 newContainers=0 finishedContainers=0 resourcelimit=<memory:0, vCores:0> knownNMs=24
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: