您的位置:首页 > 运维架构

_00007 Hadoop JobTracker源码浅析

2014-03-06 23:11 323 查看
博文作者:妳那伊抹微笑

个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在

技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术

转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!

qq交流群:214293307

(期待与你一起学习,共同进步)


# JobTracker是hadoop的mapreduce框架中最重要的一个类,这个类负责整个集群的作业控制和资源管理。

# JobTracker是hadoop的mapreduce框架中最重要的一个类,这个类负责整个集群的作业控制和资源管理。

# JobTracker的启动是在用户启动hadoop集群时启动的,启动JobTracker是通过调用JobTracker的main()方法启动。接下来看看源码

* Start the JobTracker process. This is used only for debugging. As a rule,
* JobTrackershould be run as part of the DFS Namenode process.
*/
public
static void
main(String argv[]
) throws IOException, InterruptedException {
StringUtils.startupShutdownMessage(JobTracker.class, argv,LOG);

try {
if(argv.length == 0) {
JobTrackertracker = startTracker(new JobConf());
tracker.offerService();
# JobTrackertracker = startTracker(new JobConf());实例化了一个JobTracker,进入构造方法看看

public
static
JobTracker startTracker(JobConf conf
) throws IOException,
InterruptedException {
return
startTracker(conf,generateNewIdentifier());
}
# 进入startTracker(conf, generateNewIdentifier());最后进入下面的方法

public
static
JobTracker startTracker(JobConf conf, String identifier,boolean initialize)
throws IOException,InterruptedException {
DefaultMetricsSystem.initialize("JobTracker");
JobTrackerresult = null;
while (true) {
try {
result = new JobTracker(conf,identifier);
result.taskScheduler.setTaskTrackerManager(result);
break;
} catch (VersionMismatche) {
#进入result = new JobTracker(conf, identifier);(这里主要看调度器跟RPC服务端)

# 创建一个调度器

// Create the scheduler
Class<? extends TaskScheduler>schedulerClass
=conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);

taskScheduler = (TaskScheduler)ReflectionUtils.newInstance(schedulerClass, conf);
# 起一个RPC的服务端

his.interTrackerServer =
RPC.getServer(this,addr.getHostName(), addr.getPort(), handlerCount,
false, conf,secretManager);
# web服务器jetty(跟namenode一样,提供webui方式访问,更方便)

infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort,
tmpInfoPort== 0, conf, aclsManager.getAdminsAcl());
infoServer.setAttribute("job.tracker",this);
这里的重点是创建了一个调度器跟起了一个RPC服务端和web服务器。

jobTracker跟taskTracker的通信是使用周期的调用heartbeat方法

JobClientàJobSubmissionProtocolàJobTracker

JobTracker的职责

资源分配,具体是资源调度器

监控作业运行

提交作业就是把jobID放进一个map集合,key为jobID,value为JobInProgress

然后就有一个调度器,不断的去这个map集合中获取job,然后处理

MapTask的run方法

ReduceTask的run方法



jobClient提交作业,调用JobSubmisionProtocol的方法,将作业放入JobTracker的map之后就完成工作了。再然后由调度器去那个放了作业的map方法中获取job(获取作业),然后执行



jobTracker跟taskTracker通信走的是InterTrackerProtocol

看一下heartbeat方法,接下来可以看taskTracker了

……getHeartbeatInterval得到心跳间隔 心跳传送给jobTracer,将自己的状态传给jobTracker(跟datanode一样),然后taskTracker得到一个返回值(jobTracker传给taskTracker的东西)然后再处理这个返回值(这个返回值也就是jobTracker告诉taskTracker要做的事情)

妳那伊抹微笑

The you smile until forever 、、、、、、、、、、、、、、、、、、、、、
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: