您的位置:首页 > 其它

yarn的核心设计介绍

2014-01-16 19:43 477 查看
请尊重原创,转载请注明:天然呆的技术博客

又有了提笔的冲动,斯咪哒喵。。

上回从宏观层面分析了RPC调用全过程,请参考http://user.qzone.qq.com/578333569/2

假设:您已经对yarn有了初步的了解,在此,请原谅小呆呆不会介绍什么是YARN,还请各位google下

假设:您已经理解了分布式系统通信核心:RPC调用,准备好了吗?我们就开始了呀

========================================================================

本文将使用的一些术语

RM:resourceManager

AM:applicationMaster

NM:nodeManager

简单的说,yarn涉及到3个通信协议:

ApplicationClientProtocol:client通过该协议与RM通信,以后会简称其为CR协议

ApplicationMasterProtocol:AM通过该协议与RM通信,以后会简称其为AR协议

ContainerManagementProtocol:AM通过该协议与NM通信,以后会简称其为AN协议

---------------------------------------------------------------------------------------------------------------------

通常而言,客户端向RM提交一个程序,流程是这样滴:

step1:创建一个CR协议的客户端

rmClient=(ApplicationClientProtocol)rpc.getProxy(ApplicationClientProtocol,rmAddress,conf)

step2:客户端通过CR协议#getNewApplication从RM获取唯一的应用程序ID,简化过的代码:

//GetNewApplicationRequest包含两项信息:ApplicationId 和 最大可申请的资源量

//Records.newRecord(...)是一个静态方法,通过序列化框架生成一些RPC过程需要的对象(yarn默认采用ProtocolBuffers(序列化框架,google ProtocolBuffers这些东东,麻烦大家google下呀,喵))

GetNewApplicationRequest request=Records.newRecord(GetNewApplicationRequest.class);

继续看代码(代码都是简化过的,亲们原谅):

GetNewApplicationResponse newApp =rmClient.getNewApplication(request);

ApplicationId appId = newApp.getApplicationId();

step3:客户端通过CR协议#submitApplication将AM提交到RM上,简化过的代码:

// 客户端将启动AM需要的所有信息打包到ApplicationSubmissionContext 中

ApplicationSubmissionContext context = Records.newRecord(ApplicationSubmissionContext.class);

。。。。//设置应用程序名称,优先级,队列名称云云

context.setApplicationName(appName);

//构造一个AM启动上下文对象

ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext .class)
。。。//设置AM相关的变量

amContainer.setLocalResource(localResponse);//设置AM启动所需要的本地资源

amContainer.setEnvironment(env);

context.setAMContainerSpec(amContainer);

context.setApplicationId(appId);

SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class);

request.setApplicationSubmissionContext(request);

rmClien.submitApplication(request);//将应用程序提交到RM上

--------------------------------------------------------------------------------------------------------------------------------------------------

通常而言,AM向RM注册自己,申请资源,请求NM启动Container的流程是这样滴:

AM-RM流程:

step1:创建一个AR协议的客户端

ApplicationMasterProtocol rmClient = (ApplicationMasterProtocol)rpc.getProxy(ApplicationMasterProtocol.class,rmAddress,conf);

step2:AM向RM注册自己

//这里的 recordFactory.newRecordInstance(。。。)与上面的Records.newRecord(。。。)作用一样,都属于静态调用

RegisterApplicationMasterRequest request =recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);

request.setHost(host);

request.setRpcPort(port);

request.setTrackingUrl(appTrackingUrl)

RegisterApplicationMasterResponse response = rmClient.registerApplicationMaster(request);//完成注册

step3:AM向RM请求资源

一段简化的代码如下(感兴趣的朋友,还请亲自阅读源码):

synchronized(this){

askList =new ArrayList<ResourceRequest>(ask);

releaseList = new ArrayList<ContainerId>(release);

allocateRequest = BuilderUtils.newAllocateRequest(....);构造一个 allocateRequest 对象

}

//向RM申请资源,同时领取新分配的资源(CPU,内存等)

allocateResponse = rmClient.allocate(allocateRequest ) ;

//根据RM的应答信息设计接下来的逻辑(资源分配)

.....

step4:AM告诉RM应用程序执行完毕,并退出

//构造请求对象

FinishApplicationMasterRequest request = recordFactory.newRecordInstance(FinishApplicationMasterRequest.class );
request.setFinishApplicationStatus(appStatus);

..//设置诊断信息

..//设置trackingUrl

//通知RM自己退出

rmclient.finishApplicationMaster(request);

--------------------------------------------------------------------------------------------------------------------------------------------

AM-NM流程 :

step1:构造AN协议客户端,并启动Container

String cmIpPortStr = container.getNodeId().getHost()+":"+container.getNodeId().getPort();

InetSocketAddress cmAddress=NetUtils.createSocketAddr(cmIpPortStr);

anClient = (ContainerManagementProtocol)rpc.getProxy(ContainerManagementProtocol.class,cmAddress,conf)
ContainerLaunchContext ctx=Records.newRecord(ContainerLaunchContext.class);

。。。//设置ctx变量

StartContainerRequest request = Records.newRecord(StartContainerRequest.class);

request.setContainerLaunchContext(ctx);

request.setContainer(container);

anClient.startContainer(request);

Step2:为了实时掌握各个Container运行状态,AM可通过AN协议#getContainerStatus向NodeManager询问Container运行状态

Step3:一旦一个Container运行完成后,AM可通过AN协议#stopContainer释放Container
========================================================================================================

基本设计流程介绍完毕,后续将详分析YARN提供的API

本文参考:董的博客和hadoop-yarn-sourceCode
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: