azkaban源码解读
2017-06-19 16:14
561 查看
azkaban源码解读
一. web server源代码解析
1.配置文件读取过程:
主要读取的两个配置文件为: 1)读取下面的2个文件
File azkabanPrivatePropsFile = new File(dir, AZKABAN_PRIVATE_PROPERTIES_FILE);//"azkaban.private.properties" File azkabanPropsFile = new File(dir, AZKABAN_PROPERTIES_FILE);//"azkaban.properties"
2)组织两个props形成父子关系,azkabanPrivatePropsFile 为父配置,另外一个为子配置,
父子关系如何指定呢?通过属性
private Props _parent;
接下来就立刻读取了2个属性,代码如下:
int maxThreads = azkabanSettings.getInt("jetty.maxThreads", Constants.DEFAULT_JETTY_MAX_THREAD_COUNT); boolean isStatsOn = azkabanSettings.getBoolean("jetty.connector.stats", true); logger.info("Setting up connector with stats on: " + isStatsOn);
主要是jetty启动的线程数和统计数,这都比较容易,主要的是读取的顺序。
public String get(Object key) { if (_current.containsKey(key)) { return _current.get(key); } else if (_parent != null) { return _parent.get(key); } else { return null; } }
先统计子类的信息,如果有了就不用统计父类的配置啦。
PS.azkaban.private.properties是azkaban.properties的父类
2.Servlet引擎初始化
ServletHolder staticServlet = new ServletHolder(new DefaultServlet()); root.addServlet(staticServlet, "/css/*"); root.addServlet(staticServlet, "/js/*"); root.addServlet(staticServlet, "/images/*"); root.addServlet(staticServlet, "/fonts/*"); root.addServlet(staticServlet, "/favicon.ico"); // 静态资源配置路径 root.addServlet(new ServletHolder(new ProjectManagerServlet()), "/manager"); root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor"); root.addServlet(new ServletHolder(new HistoryServlet()), "/history"); root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule"); root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx"); root.addServlet(new ServletHolder(new TriggerManagerServlet()), "/triggers"); root.addServlet(new ServletHolder(new StatsServlet()), "/stats"); // 动态请求配置路径 ServletHolder restliHolder = new ServletHolder(new RestliServlet()); restliHolder.setInitParameter("resourcePackages", "azkaban.restli"); root.addServlet(restliHolder, "/restli/*");
Azkaban 是基于jetty进行发布的。Servlet 是 server applet 的缩写,即服务器运行小程序,而Servlet框架是对HTTP服务器和用户小程序中间层的标准化和抽象形式。
在Jetty中,每个Servlet和其相关信息都由ServletHolder封装。Context代码引擎以及url映射到哪些servlet下,ServletHolder会代理不同servlet的操作。
3.配置session
1)azkaban前端开发Velocity框架配置就不一一介绍了 2)session管理
public SessionCache(Props props) {//直接调用google的jar包 cache = CacheBuilder.newBuilder() .maximumSize(props.getInt("max.num.sessions", MAX_NUM_SESSIONS)) .expireAfterAccess( props.getLong("session.time.to.live", SESSION_TIME_TO_LIVE), TimeUnit.MILLISECONDS) .build();// } }
主要是使用了google的Guava Cache,azkaban web的session是缓存在本地中,如果要将azkaban web做成分布式的,需要将本地缓存改为memcache或redis。
PS.Guava Cache 和 ConcurrentHashMap缓存区别。hashmap需要显示的删除,但是cache不需要,它会自动回收,但是ConcurrentHashMap会有更好的内存效率,具体的源码还没仔细看。
之后,他会将azkaban-user.xml这个xml中的文件加入到缓存中。。
4.azkaban-user.xml中的权限初始化
azkaban是采用用户——角色——组权限三个维度控制权限。其中用户可以创建用户组,给用户组制定权限,这样在该用户组下的所有用户自动拥有该权限。 主要是分析用户xml获取用户的UserTag,RoleTag和GroupRoleTag信息,核心代码如下:
for (int i = 0; i < azkabanUsersList.getLength(); ++i) {//遍历每个节点 Node node = azkabanUsersList.item(i);//获取当前节点 if (node.getNodeType() == Node.ELEMENT_NODE) {//节点类型是否是我们需要的? if (node.getNodeName().equals(USER_TAG)) {//用户节点 parseUserTag(node, users, userPassword, proxyUserMap); } else if (node.getNodeName().equals(ROLE_TAG)) { parseRoleTag(node, roles); } else if (node.getNodeName().equals(GROUP_TAG)) { parseGroupRoleTag(node, groupRoles); } } }
eg:
<azkaban-users> <user username="admin" password="admin" roles="admin" groups="admin" /> <user username="zhangsan" password="zhangsan" groups="group_user" /> <user username="lisi" password="lisi" groups="group_user" /> <user username="metrics" password="metrics" roles="metrics"/> <user username="azkaban" password="azkaban" groups="group_inspector"/> <group name="group_user" roles="user" /> <group name="group_inspector" roles="inspector" /> <role name="admin" permissions="ADMIN" /> <role name="metrics" permissions="METRICS"/> <role name="user" permissions="READ,WRITE,EXECUTE,SCHEDULE,CREATEPROJECTS"/> <role name="inspector" permissions="READ"/> <role name="write" permissions="WRITE"/> <role name="execute" permissions="EXECUTE"/> <role name="schedule" permissions="SCHEDULE"/> <role name="createprojects" permissions="CREATEPROJECTS"/> </azkaban-users>
admin用户拥有超级管理员权限,可以给其他用户赋权限。
在group_user用户组下的用户,拥有使用azkaban的权限,可以创建project,读写执行,调度。
在group_inspector用户组下的用户,拥有审查员权限,只能读。也就是只能看project项目,flow,看执行日志,但是不能更改。
当然如果希望可以使用已经存在的用户系统,也可以实现azkaban UserManager的接口。去配置相应用户。。
public interface UserManager { public User getUser(String username, String password) throws UserManagerException; public boolean validateUser(String username); public boolean validateGroup(String group); public Role getRole(String roleName); public boolean validateProxyUser(String proxyUser, User realUser); }
5.Jdbc初始化过程:
主要的源码位于JdbcExecutorLoader 下,其底层内部实现主要为dbcp的连接池
6.创建项目,工作流
首先,上传zip文件;项目和工作流的所有信息都是存储在mysql数据库中的。 其中在上传工作流时,azkaban会对上传的zip文件进行解压缩,然后分析成各个节点组成的DAG图。
工作流的步骤:
// Load all the props files and create the Node objects loadProjectFromDir(baseDirectory.getPath(), baseDirectory, null); jobPropertiesCheck(project); // Create edges and find missing dependencies resolveDependencies(); // Create the flows. buildFlowsFromDependencies(); // Resolve embedded flows resolveEmbeddedFlows();
1)loadProjectFromDir --- 从目录中加载Flow的定义
# foo.job 文件内容 type=command command=echo foo # bar.job 文件内容 type=command dependencies=foo command=echo bar private void loadProjectFromDir(String base, File dir, Props parent) { File[] propertyFiles = dir.listFiles(new SuffixFilter(PROPERTY_SUFFIX)); Arrays.sort(propertyFiles); // zip文件中的配置文件信息 for (File file : propertyFiles) { String relative = getRelativeFilePath(base, file.getPath()); try { parent = new Props(parent, file); parent.setSource(relative); FlowProps flowProps = new FlowProps(parent); flowPropsList.add(flowProps); } catch (IOException e) { errors.add("Error loading properties " + file.getName() + ":" + e.getMessage()); } logger.info("Adding " + relative); propsList.add(parent); } // 加载所有的.job文件信息,如果有重复的就不加载了。 File[] jobFiles = dir.listFiles(new SuffixFilter(JOB_SUFFIX)); for (File file : jobFiles) { String jobName = getNameWithoutExtension(file); try { if (!duplicateJobs.contains(jobName)) { if (jobPropsMap.containsKey(jobName)) { errors.add("Duplicate job names found '" + jobName + "'."); duplicateJobs.add(jobName); jobPropsMap.remove(jobName); nodeMap.remove(jobName); } else { // 将job的配置信息和开始加载,parent开始为null Props prop = new Props(parent, file); // 截取字符串 String relative = getRelativeFilePath(base, file.getPath()); prop.setSource(relative); // 截取后的job名类似hello.job // 构造了一个节点 Node node = new Node(jobName); String type = prop.getString("type", null); if (type == null) { errors.add("Job doesn't have type set '" + jobName + "'."); } node.setType(type); node.setJobSource(relative); if (parent != null) { node.setPropsSource(parent.getSource()); } // 如果是root节点 if (prop.getBoolean(CommonJobProperties.ROOT_NODE, false)) { rootNodes.add(jobName); } jobPropsMap.put(jobName, prop); nodeMap.put(jobName, node); } } } catch (IOException e) { errors.add("Error loading job file " + file.getName() + ":" + e.getMessage()); } } //如果有子文件夹,同样的加载,说明支持多层次的文件夹 File[] subDirs = dir.listFiles(DIR_FILTER); for (File file : subDirs) { loadProjectFromDir(base, file, parent); } }
2).jobPropertiesCheck
检查校验一些参数的合法性
3).resolveDependencies
创建节点直接的关联关系。代码就不一条条的截取了,主要是依照1中生成的map生成一套HashMap> nodeDependencies;依赖关联的map。 比如,我们现在有两个任务 a.job 和 b.job。b依赖于a。则在最终的节点中,存储的是:
HashMap<
// 当前节点信息
String, ---b
Map<
// 父节点信息
String, ---a
// 边信息
Edge ---
>
>
4).buildFlowsFromDependencies
这个方法的主要功能就是依据依赖关系,创建工作流,修改数据库记录这些信息。 a.先是查询出当前最大的版本号,然后+1赋值。
b.上传工作流等相关的文件到数据库中,10M为1个单位,到project_files表中。
c.然后修改project_flows,这张表主要记录了依据.job文件生成的工作流拓扑图。
5).resolveEmbeddedFlows
递归判断每个依赖,看是否有环出现或者工作流中的依赖是否存在。private void resolveEmbeddedFlow(String flowId, Set<String> visited) { Set<String> embeddedFlow = flowDependencies.get(flowId); if (embeddedFlow == null) { return; } visited.add(flowId); for (String embeddedFlowId : embeddedFlow) { if (visited.contains(embeddedFlowId)) { errors.add("Embedded flow cycle found in " + flowId + "->" + embeddedFlowId); return; } else if (!flowMap.containsKey(embeddedFlowId)) { errors.add("Flow " + flowId + " depends on " + embeddedFlowId + " but can't be found."); return; } else { resolveEmbeddedFlow(embeddedFlowId, visited); } } visited.remove(flowId); }
7.提交工作流
在web server中提交一个任务时,如果没有指定要提交的executor则会由选择器进行选择,依照executor本身的hashcode来计算提交到哪。。。之后再azkaban.executor.ExecutorManager 中
// 构造http client ExecutorApiClient apiclient = ExecutorApiClient.getInstance(); @SuppressWarnings("unchecked") // 构造URI URI uri = ExecutorApiClient.buildUri(host, port, path, true, paramList.toArray(new Pair[0])); return apiclient.httpGet(uri, null);
这其中构造了一个类似: uri = "http://x.x.x.x:port/executor?action=execute&execid=12&user"
这种的url跳转到executor中执行任务。
二. executor代码:
每个节点的结构如下:InNode InNode InNode InNode InNode Node OutNode OutNode OutNode OutNode
Node是当前节点,InNodes是父节点,OutNodes是子节点
1.接收任务
azkaban的web是作为分发者存在的,web分发GET请求任务到executor Server中private void handleAjaxExecute(HttpServletRequest req, Map<String, Object> respMap, int execId) throws ServletException { try { // 提交到本地的manager来处理 flowRunnerManager.submitFlow(execId); } catch (ExecutorManagerException e) { e.printStackTrace(); logger.error(e.getMessage(), e); respMap.put(RESPONSE_ERROR, e.getMessage()); } }
这里主要的处理逻辑是,从数据库中获取project的详情,然后从project_files中拿到上传的压缩文件,解压到本地的projects文件夹中。
最终提交任务到线程池中:
Future future = executorService.submit(runner);
线城池是按照工作流配置参数"flow.num.job.threads",来设置线程数的。
2.Execute Server的任务真正执行过程
1)计算当前拓扑图的开始节点
public List<String> getStartNodes() { if (startNodes == null) { startNodes = new ArrayList<String>(); for (ExecutableNode node : executableNodes.values()) { if (node.getInNodes().isEmpty()) { startNodes.add(node.getId()); } } } return startNodes; }
2) 运行的基本配置
runReadyJob :private boolean runReadyJob(ExecutableNode node) throws IOException { if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) { return false; } // 判断这个当前节点的下个节点是否需要执行, Status nextNodeStatus = getImpliedStatus(node); if (nextNodeStatus == null) { return false; } .... }
getImpliedStatus:判断当前节点是否可执行,当该节点的父类节点的状态都是FINISHED并且不是Failed和KILLED,就返回可执行。
ExecutableFlowBase flow = node.getParentFlow(); boolean shouldKill = false; for (String dependency : node.getInNodes()) { ExecutableNode dependencyNode = flow.getExecutableNode(dependency); Status depStatus = dependencyNode.getStatus(); if (!Status.isStatusFinished(depStatus)) { return null; } else if (depStatus == Status.FAILED || depStatus == Status.CANCELLED || depStatus == Status.KILLED) { // We propagate failures as KILLED states. shouldKill = true; } }
3)执行每一个节点
for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) { ExecutableNode startNode = flow.getExecutableNode(startNodeId); runReadyJob(startNode); }
......
private void runExecutableNode(ExecutableNode node) throws IOException { // Collect output props from the job's dependencies. prepareJobProperties(node); node.setStatus(Status.QUEUED); JobRunner runner = createJobRunner(node); logger.info("Submitting job '" + node.getNestedId() + "' to run."); try { executorService.submit(runner); activeJobRunners.add(runner); } catch (RejectedExecutionException e) { logger.error(e); }
}
从没有父类InNodes的开始节点开始,每次按照2中的标准执行其OutNodes子节点。执行的本质是将每个job放置在线程池中执行,封装的对象是jobRunner。
4)jobRunner listener详解
在执行前。会有一堆校验和预处理操作。。。 其中,最重要的是加入了一个监听器。
4.1 JobRunnerEventListener: azkaban.execapp.FlowRunner
这块的代码主要是用作修改工作流的节点的状态信息,当修改后会产生一个Event,这个事件记录了节点修改后的状态。主要源码:
if (quickFinish) { node.setStartTime(time); fireEvent(Event.create(this, Type.JOB_STARTED, new EventData(nodeStatus))); node.setEndTime(time); fireEvent(Event.create(this, Type.JOB_FINISHED, new EventData(nodeStatus))); return true; }
4.2 JobCallbackManager : 这个listener主要监听结束状态的节点信息,修改任务最重结束的状态,成功or失败or killed..
4.3JmxJobMBeanManager: 这个主要是记录了当前环境下的所有任务执行状态的统计,比如:当当前任务执行成功后,总执行任务+1 ,成功执行任务+1。以此类推。
5) JobRunner中构造Job
5.1首先通过job type选择不同的任务类型:private void loadDefaultTypes(JobTypePluginSet plugins) throws JobTypeManagerException { logger.info("Loading plugin default job types"); plugins.addPluginClass("command", ProcessJob.class); plugins.addPluginClass("javaprocess", JavaProcessJob.class); plugins.addPluginClass("noop", NoopJob.class); plugins.addPluginClass("python", PythonJob.class); plugins.addPluginClass("ruby", RubyJob.class); plugins.addPluginClass("script", ScriptJob.class); }
5.2 依照不同类型去创建任务:(具体的代码比较多,就不粘贴了,在azkaban.jobtype.JobTypeManager),主要的思想是通过反射构造。
job = (Job) Utils.callConstructor(executorClass, jobId, pluginLoadProps, jobProps, logger);
6). 在jobRunner中构造好了job接着就需要执行了。job.
6.1 执行一个节点 在这我们用command任务做说明:
ProcessJob.run():
// 可以传入多条command for (String command : commands) { // 申请一条进行去处理 AzkabanProcessBuilder builder = null; if (isExecuteAsUser) { command = String.format("%s %s %s", executeAsUserBinaryPath, effectiveUser, command); info("Command: " + command); builder = new AzkabanProcessBuilder(partitionCommandLine(command)) .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog()) .enableExecuteAsUser().setExecuteAsUserBinaryPath(executeAsUserBinaryPath) .setEffectiveUser(effectiveUser); } else { info("Command: " + command); builder = new AzkabanProcessBuilder(partitionCommandLine(command)) .setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog()); } // 设置env if (builder.getEnv().size() > 0) { info("Environment variables: " + builder.getEnv()); } info("Working directory: " + builder.getWorkingDir()); // print out the Job properties to the job log. this.logJobProperties(); boolean success = false; this.process = builder.build(); try { // 执行这个进程 this.process.run(); success = true; } catch (Throwable e) { for (File file : propFiles) if (file != null && file.exists()) file.delete(); throw new RuntimeException(e); } finally { this.process = null; info("Process completed " + (success ? "successfully" : "unsuccessfully") + " in " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds."); } }
从上述代码中可以看出,azkaban在执行command类型任务时,都是在系统环境下生成一个process去处理每一行的command。
eg:
type=command command=sleep 1 command.1=echo "start execute"
.job文件内容为command; command.1 这两条不同的任务执行会生成两条不同的进程,两个进程间无法通信获取内容及结果。
6.2触发子节点执行:
在flowRunner中,会判断当期工作流是否结束,不结束判断下一个执行节点
while (!flowFinished) { synchronized (mainSyncObj) { if (flowPaused) { try { mainSyncObj.wait(CHECK_WAIT_MS); } catch (InterruptedException e) { } continue; } else { if (retryFailedJobs) { retryAllFailures(); // 判断下一个执行节点 } else if (!progressGraph()) { try { mainSyncObj.wait(CHECK_WAIT_MS); } catch (InterruptedException e) { } } } } }
progressGraph 这个关键的方法代码如下:
private boolean progressGraph() throws IOException { finishedNodes.swap(); // 当这个节点结束后,去获取outNodes,outNodes是图中,当前节点的下一个 HashSet<ExecutableNode> nodesToCheck = new HashSet<ExecutableNode>(); for (ExecutableNode node : finishedNodes) { Set<String> outNodeIds = node.getOutNodes(); ExecutableFlowBase parentFlow = node.getParentFlow(); // 如果任务失败了,那设置工作流失败 if (node.getStatus() == Status.FAILED) { // The job cannot be retried or has run out of retry attempts. We will // fail the job and its flow now. if (!retryJobIfPossible(node)) { propagateStatus(node.getParentFlow(), Status.FAILED_FINISHING); if (failureAction == FailureAction.CANCEL_ALL) { this.kill(); } this.flowFailed = true; } else { nodesToCheck.add(node); continue; } } // 如果没有后续节点则工作流over了 if (outNodeIds.isEmpty()) { finalizeFlow(parentFlow); finishExecutableNode(parentFlow); // If the parent has a parent, then we process if (!(parentFlow instanceof ExecutableFlow)) { outNodeIds = parentFlow.getOutNodes(); parentFlow = parentFlow.getParentFlow(); } } // 如果有后续节点,加入list for (String nodeId : outNodeIds) { ExecutableNode outNode = parentFlow.getExecutableNode(nodeId); nodesToCheck.add(outNode); } } // 先看是否有skip或者kill的任务(用户可以设置哪些节点不执行),若没有则继续执行后续节点 boolean jobsRun = false; for (ExecutableNode node : nodesToCheck) { if (Status.isStatusFinished(node.getStatus()) || Status.isStatusRunning(node.getStatus())) { // Really shouldn't get in here. continue; } jobsRun |= runReadyJob(node); } if (jobsRun || finishedNodes.getSize() > 0) { updateFlow(); return true; } return false; }
当这些都执行结束后:
logger.info("Finishing up flow. Awaiting Termination"); executorService.shutdown(); updateFlow(); logger.info("Finished Flow");
关闭线程池,更新数据库
总而言之,一个executorService对应着一个工作流的执行信息。每条现成会去执行节点(开辟系统的process去执行)
相关文章推荐
- azkaban源码解读
- LinkedHashMap底层源码解读
- AMPS:队列源码解读
- 源码解读(一): spring在web容器中的初始化过程
- 第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考
- android AsyncTask 源码解读
- JVM源码分析之Jstat工具原理完全解读
- ArrayList源码详尽解读(二)
- HashSet源码分析解读
- Spark源码解读之SparkContext初始化
- shared_ptr源码解读
- AMPS:内存管理模块源码解读(二)
- spark1.3版本源码解读
- Dubbo 源码解读 —— 可支持序列化及自定义扩展
- Deep MNIST for Experts解读(三):deepnn源码分析与AdamOptimizer
- Tomcat源码解读系列(一)——server.xml文件的配置
- Java annotation源码解读
- [Hadoop源码解读](五)MapReduce篇之Writable相关类
- EventBus3 源码解读
- jquery插件select2源码解读(一) 概述