[置顶] Elastic-Job-Lite 源码阅读 ---- 任务初始化
2017-11-06 20:25
579 查看
整体设计的核心 zk 结构图:
写一个测试用例,开始 debug: 代码解析一般直接写在注释里
先进入 main 方法的 init:
进入 registerStartUpInfo 方法:
进入:listenerManager.startAllListeners()
进入 leaderService.electLeader():
进入 serverService.persistOnline(enabled);
进入:instanceService.persistOnline();
进入 shardingService.setReshardingFlag();
进入 monitorService.listen();
进入 reconcileService.startAsync(); // 调节分布式作业不一致服务
写一个测试用例,开始 debug: 代码解析一般直接写在注释里
/** * @author wenniuwuren */ public class JobLite implements SimpleJob { public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } public static void main(String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } private static CoordinatorRegistryCenter createRegistryCenter() { // 可以看到当前使用 ZK 实现注册中心,这样的接口设计可以方便后续支持其他诸如 Consul,Eureka 等实现注册中心 CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.243.128:2181", "elastic-job-demo")); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration() { // 创建作业配置 // // 定义作业核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("JobLite", "0/15 * * * * ?", 10).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, JobLite.class.getCanonicalName()); // 定义Lite作业根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); return simpleJobRootConfig; } }
先进入 main 方法的 init:
/** * 初始化作业. */ public void init() { // 把作业配置存到 zk 的 /命名空间/任务名称/config 节点 LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); // 设置当前配置的分片总数 JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); // Quartz 相关初始化。所有Quartz相关增删改查封装在这个类里。作者把它叫做作业调度控制器。 JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); // 将作业调度控制器和注册中心与这个刚配置的任务映射在Map结构的内存中 JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); // 注册作业启动信息(包含注册zk监听,选主)。这个方法比较复杂,下面详细解析 schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); // 启动定时任务 jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }
进入 registerStartUpInfo 方法:
/** * 注册作业启动信息. * * @param enabled 作业是否启用 */ public void registerStartUpInfo(final boolean enabled) { // 下面进行详细解析 listenerManager.startAllListeners(); leaderService.electLeader(); serverService.persistOnline(enabled); instanceService.persistOnline(); shardingService.setReshardingFlag(); monitorService.listen(); if (!reconcileService.isRunning()) { // 调解分布式作业不一致状态服务. reconcileService.startAsync(); } }
进入:listenerManager.startAllListeners()
/** * 开启所有监听器. */ public void startAllListeners() { // 1.选Leader条件:如果调度任务已终止(本地缓存中Quartz调度不包含此作业并且任务实例也不包含此作业)&& (没有leader或者leader宕机) // 2.让出leader条件:当前leader宕机 electionListenerManager.start(); // 分片监听:重新分配 shard 条件。(1)如果当前 shard 数量和配置的时候存的(以新配置的为准)数量不一致,则进行重新分片 (2) 服务变更 shardingListenerManager.start(); // 失效转移安金婷:当调度服务宕机,将需要做failover的分片放到 failover/items/分片项下。 如果不需要failover 则删除failover节点内容 failoverListenerManager.start(); // 幂等性监听:不需要监控分片运行,则清除全部分片运行状态 monitorExecutionListenerManager.start(); // 运行实例关闭监听:监听调度服务宕机,清理工作 shutdownListenerManager.start(); // 作业触发监听: triggerListenerManager.start(); // 重新调度监听:删除旧的,增加新的调度 rescheduleListenerManager.start(); // 保证分布式任务全部开始和结束状态监听管理器:及时notify,减少wait时间 guaranteeListenerManager.start(); // 连接状态监听:curator自己的实现 jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener); }
进入 leaderService.electLeader():
/** * 选举主节点. */ public void electLeader() { log.debug("Elect a new leader now."); // leader/election/latch jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); log.debug("Leader election completed."); } /** * 在主节点执行操作. * * @param latchNode 分布式锁使用的作业节点名称 * @param callback 执行操作的回调 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); // 选主成功则创建临时节点 leader/election/instance 存储调度服务器ip callback.execute(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON handleException(ex); } } @RequiredArgsConstructor class LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { if (!hasLeader()) { // zk 创建临时节点,这样 client 断开就能发现,并重新选主 jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } } }
进入 serverService.persistOnline(enabled);
/** * 持久化作业服务器上线信息. * * @param enabled 作业是否启用 */ public void persistOnline(final boolean enabled) { if (!JobRegistry.getInstance().isShutdown(jobName)) { // 把 servers/ip地址 存到zk jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name()); } }
进入:instanceService.persistOnline();
/** * 持久化作业运行实例上线相关信息. */ public void persistOnline() { // 向zk保存 /JobLite/instances/192.168.243.1@-@9952 jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), ""); }
进入 shardingService.setReshardingFlag();
/** * 设置需要重新分片的标记. */ public void setReshardingFlag() { // 如果zk节点 leader/sharding/necessary 不存在则创建 jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY); }
进入 monitorService.listen();
/** * 初始化作业监听服务. */ public void listen() { int port = configService.load(true).getMonitorPort(); // 默认不监控 if (port < 0) { return; } try { log.info("Elastic job: Monitor service is running, the port is '{}'", port); // echo "dump" | nc <任意一台作业服务器IP> 9888 // 本地缓存和注册中心数据不一致时,DUMP 出 [zkPath, zkValue, treeCachePath, treeCacheValue]。相同时,只 DUMP 出 [zkPath, zkValue] openSocketForMonitor(port); } catch (final IOException ex) { log.error("Elastic job: Monitor service listen failure, error is: ", ex); } }
进入 reconcileService.startAsync(); // 调节分布式作业不一致服务
/**借助 Guava 的并发包管理,异步启动 * @since 15.0 */ @Override public final Service startAsync() { delegate.startAsync(); return this; } // 每1分钟执行一次 @Override protected void runOneIteration() throws Exception { LiteJobConfiguration config = configService.load(true); int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes(); if (reconcileIntervalMinutes > 0 && (System.currentTimeMillis() - lastReconcileTime >= reconcileIntervalMinutes * 60 * 1000)) { lastReconcileTime = System.currentTimeMillis(); // 是leader节点,需要分片,并且有持有分片的机器宕机或者下线维护 if (leaderService.isLeaderUntilBlock() && !shardingService.isNeedSharding() && shardingService.hasShardingInfoInOfflineServers()) { log.warn("Elastic Job: job status node has inconsistent value,start reconciling..."); shardingService.setReshardingFlag(); } } } // 这里设置执行间隔 @Override protected Scheduler scheduler() { return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES); }
相关文章推荐
- [置顶] Elastic-Job-Lite 源码阅读 ---- 作业执行
- [置顶] elastic job (一) elastic-job-lite----SimpleJob
- [置顶] nginx源码阅读(二).初始化:main函数及ngx_init_cycle函数
- [hadoop源码阅读][9]-mapreduce-job提交过程
- lite源码阅读(三)基本内容
- 17 任务调度相关类综述——Live555源码阅读(一)任务调度相关类
- Elastic-Job - 分布式定时任务框架
- elastic-lite-job service节点
- storm源码阅读笔记之任务调度算法
- elastic-job(lite)使用的一些注意事项
- lite源码阅读(四)所有的函数
- 定时任务之elastic-job概述
- Glide源码阅读(一 补充) Glide单例初始化做的事
- Quartz小记(一):Elastic-Job - 分布式定时任务框架
- Elastic-Job项目源码分析3-- 浅谈源码分析
- elastic-job-Lite入门
- Elastic-job源码学习——环境准备
- Memcached源码阅读之内存初始化
- Tomcat源码阅读(二)初始化
- [置顶] nginx源码阅读(十).ngx_event_core_module模块