Azkaban的Exec Server分析 29:JobRunner的真正执行过程(前奏)
2016-04-11 00:00
567 查看
重点来了,现在看看Job Runner的运行过程!
==========================================================================================
jdb azkaban.execapp.AzkabanExecutorServer -conf /root/azkb/azkaban_3.0.0_debug/conf
stop in azkaban.execapp.JobRunner.run
run
==========================================================================================
之前做了一些校验工作,然后
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
让我们来看有哪些Listeners
JobRunner-Hello-17[1] print listeners
listeners = "[
azkaban.execapp.FlowRunner$JobRunnerEventListener@2a32e7d1,
azkaban.execapp.event.JobCallbackManager@2b274bce,
azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c
]"
那么,这3个Listener分别做了什么事情呢?
==========================================================================================
第一个,不好意思,这里是start,所以下面的代码都没有执行!
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
@Override
public synchronized void handleEvent(Event event) {
//看到这里了
JobRunner runner = (JobRunner) event.getRunner();
//看到这里了
if (event.getType() == Type.JOB_STATUS_CHANGED) {
updateFlow();
} else if (event.getType() == Type.JOB_FINISHED) {
ExecutableNode node = runner.getNode();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in "
+ seconds + " seconds");
// Cancellation is handled in the main thread, but if the
// flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
finishedNodes.add(node);
node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
fireEventListeners(event);
}
}
//看到这里了
}
==========================================================================================
第二个呢?
@Override
public void handleEvent(Event event) {
// 看到这里了
if (!isInitialized) {
return;
}
if (event.getRunner() instanceof JobRunner) {
// 看到这里了
try {
if (event.getType() == Event.Type.JOB_STARTED) {
// 这里开始执行
processJobCallOnStart(event);
} else if (event.getType() == Event.Type.JOB_FINISHED) {
processJobCallOnFinish(event);
}
} catch (Throwable e) {
// Use job runner logger so user can see the issue in their job
// log
JobRunner jobRunner = (JobRunner) event.getRunner();
jobRunner.getLogger().error("Encountered error while hanlding job callback event", e);
}
} else {
logger.warn("((( Got an unsupported runner: " + event.getRunner().getClass().getName() + " )))");
}
}
进去debug,发现还是什么都没有执行。。。怪我咯?
==========================================================================================
JmxJobMBeanManager
这个Listener具体做了啥呢?
if (event.getType() == Event.Type.JOB_STARTED) {
// 只是增加了一个计数器的值。。。
runningJobCount.incrementAndGet();
紧接着
loader.uploadExecutableNode(node, props);
这个到底干嘛的???
final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
+ "(exec_id, project_id, version, flow_id, job_id, start_time, "
+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
执行了这个语句先!
==========================================================================================
stop in azkaban.execapp.JobRunner.insertJobMetadata
==========================================================================================
下面会构造一个JVM参数
/**
* Add useful JVM arguments so it is easier to map a running Java process to
* a flow, execution id and job
*/
private void insertJVMAargs() {
// 看到这里了
String flowName = node.getParentFlow().getFlowId();//
String jobId = node.getId();
// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"
String jobJVMArgs = String.format("-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s", flowName,
executionId, jobId);
// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"
String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);
jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;
//存起来
logger.info("job JVM args: " + jobJVMArgs);
props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);
}
==========================================================================================
然后发现,JobRunner实例,需要构造一个具体的job对象
这是在
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger); 实现的
那么到底怎么构造这个对象呢?
鉴于这部分比较重要,我还是另开一个小节吧。
==========================================================================================
jdb azkaban.execapp.AzkabanExecutorServer -conf /root/azkb/azkaban_3.0.0_debug/conf
stop in azkaban.execapp.JobRunner.run
run
==========================================================================================
之前做了一些校验工作,然后
fireEvent(Event.create(this, Type.JOB_STARTED, null, false));
让我们来看有哪些Listeners
JobRunner-Hello-17[1] print listeners
listeners = "[
azkaban.execapp.FlowRunner$JobRunnerEventListener@2a32e7d1,
azkaban.execapp.event.JobCallbackManager@2b274bce,
azkaban.execapp.jmx.JmxJobMBeanManager@53976f5c
]"
那么,这3个Listener分别做了什么事情呢?
==========================================================================================
第一个,不好意思,这里是start,所以下面的代码都没有执行!
private class JobRunnerEventListener implements EventListener {
public JobRunnerEventListener() {
}
@Override
public synchronized void handleEvent(Event event) {
//看到这里了
JobRunner runner = (JobRunner) event.getRunner();
//看到这里了
if (event.getType() == Type.JOB_STATUS_CHANGED) {
updateFlow();
} else if (event.getType() == Type.JOB_FINISHED) {
ExecutableNode node = runner.getNode();
long seconds = (node.getEndTime() - node.getStartTime()) / 1000;
synchronized (mainSyncObj) {
logger.info("Job " + node.getNestedId() + " finished with status " + node.getStatus() + " in "
+ seconds + " seconds");
// Cancellation is handled in the main thread, but if the
// flow is
// paused, the main thread is paused too.
// This unpauses the flow for cancellation.
if (flowPaused && node.getStatus() == Status.FAILED && failureAction == FailureAction.CANCEL_ALL) {
flowPaused = false;
}
finishedNodes.add(node);
node.getParentFlow().setUpdateTime(System.currentTimeMillis());
interrupt();
fireEventListeners(event);
}
}
//看到这里了
}
==========================================================================================
第二个呢?
@Override
public void handleEvent(Event event) {
// 看到这里了
if (!isInitialized) {
return;
}
if (event.getRunner() instanceof JobRunner) {
// 看到这里了
try {
if (event.getType() == Event.Type.JOB_STARTED) {
// 这里开始执行
processJobCallOnStart(event);
} else if (event.getType() == Event.Type.JOB_FINISHED) {
processJobCallOnFinish(event);
}
} catch (Throwable e) {
// Use job runner logger so user can see the issue in their job
// log
JobRunner jobRunner = (JobRunner) event.getRunner();
jobRunner.getLogger().error("Encountered error while hanlding job callback event", e);
}
} else {
logger.warn("((( Got an unsupported runner: " + event.getRunner().getClass().getName() + " )))");
}
}
进去debug,发现还是什么都没有执行。。。怪我咯?
==========================================================================================
JmxJobMBeanManager
这个Listener具体做了啥呢?
if (event.getType() == Event.Type.JOB_STARTED) {
// 只是增加了一个计数器的值。。。
runningJobCount.incrementAndGet();
紧接着
loader.uploadExecutableNode(node, props);
这个到底干嘛的???
final String INSERT_EXECUTION_NODE = "INSERT INTO execution_jobs "
+ "(exec_id, project_id, version, flow_id, job_id, start_time, "
+ "end_time, status, input_params, attempt) VALUES (?,?,?,?,?,?,?,?,?,?)";
执行了这个语句先!
==========================================================================================
stop in azkaban.execapp.JobRunner.insertJobMetadata
==========================================================================================
下面会构造一个JVM参数
/**
* Add useful JVM arguments so it is easier to map a running Java process to
* a flow, execution id and job
*/
private void insertJVMAargs() {
// 看到这里了
String flowName = node.getParentFlow().getFlowId();//
String jobId = node.getId();
// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"
String jobJVMArgs = String.format("-Dazkaban.flowid=%s -Dazkaban.execid=%s -Dazkaban.jobid=%s", flowName,
executionId, jobId);
// "-Dazkaban.flowid=World -Dazkaban.execid=17 -Dazkaban.jobid=Hello"
String previousJVMArgs = props.get(JavaProcessJob.JVM_PARAMS);
jobJVMArgs += (previousJVMArgs == null) ? "" : " " + previousJVMArgs;
//存起来
logger.info("job JVM args: " + jobJVMArgs);
props.put(JavaProcessJob.JVM_PARAMS, jobJVMArgs);
}
==========================================================================================
然后发现,JobRunner实例,需要构造一个具体的job对象
这是在
job = jobtypeManager.buildJobExecutor(this.jobId, props, logger); 实现的
那么到底怎么构造这个对象呢?
鉴于这部分比较重要,我还是另开一个小节吧。
相关文章推荐
- azkaban简介
- Azkaban-任务调度管理器
- Azkaban的Web Server源码探究系列6: alerters及插件机制分析
- Azkaban的Web Server源码探究系列7: ExecutorManager的初始化
- Azkaban的Web Server源码探究系列8: 水文一篇
- Azkaban的Web Server源码探究系列9: Servlet引擎初始化
- Azkaban的Web Server源码探究系列10 : /对应的servlet解析
- Azkaban的Web Server源码探究系列11: 登陆对应的servlet解析
- Azkaban的Web Server源码探究系列12: 首页之前的跳转
- Azkaban的Web Server源码探究系列13:首页/index的内容获取
- Azkaban的Web Server源码探究系列14:创建Project
- Azkaban的Web Server源码探究系列15:使用过程中几个需注意的配置&3.0中丢失的文件
- Azkaban的Web Server源码探究系列16:跳转Project
- Azkaban的Web Server源码探究系列17:Creating Flows
- Azkaban的Web Server源码探究系列19:loadProjectFromDir&Chek
- Azkaban的Web Server源码探究系列20:resolve&buildFlow
- Azkaban的Web Server源码探究系列21: Flow执行前的准备工作
- Azkaban的Web Server源码探究系列22: 一次性执行execute的提交准备
- Azkaban的Web Server源码探究系列23: 一次性执行execute的正式提交
- Azkaban的Web Server源码探究系列24: 一次性执行execute任务取出处理