Flume NG源码分析(二)支持运行时动态修改配置的配置模块
2015-06-08 17:08
661 查看
在上一篇中讲了Flume NG配置模块基本的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的运行时动态修改配置并生效的能力。
要实现动态修改配置文件并生效,主要有两个待实现的功能
1. 观察配置文件是否修改
2. 如果修改,将修改的内容通知给观察者
对于第一点,监控配置文件是否修改,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程采用定时轮询的方式来监控,轮询频率是30毫秒一次,比较file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被修改
对于第二点,利用Guava EventBus提供的发布订阅模式机制,将配置修改封装成事件传递给Application,来重新加载配置
要实现动态修改配置文件并生效,主要有两个待实现的功能
1. 观察配置文件是否修改
2. 如果修改,将修改的内容通知给观察者
对于第一点,监控配置文件是否修改,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程采用定时轮询的方式来监控,轮询频率是30毫秒一次,比较file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被修改
public class FileWatcherRunnable implements Runnable { private final File file; private final CounterGroup counterGroup; private long lastChange; public FileWatcherRunnable(File file, CounterGroup counterGroup) { super(); this.file = file; this.counterGroup = counterGroup; this.lastChange = 0L; } @Override public void run() { LOGGER.debug("Checking file:{} for changes", file); counterGroup.incrementAndGet("file.checks"); long lastModified = file.lastModified(); if (lastModified > lastChange) { LOGGER.info("Reloading configuration file:{}", file); counterGroup.incrementAndGet("file.loads"); lastChange = lastModified; try { eventBus.post(getConfiguration()); } catch (Exception e) { LOGGER.error("Failed to load configuration data. Exception follows.", e); } catch (NoClassDefFoundError e) { LOGGER.error("Failed to start agent because dependencies were not " + "found in classpath. Error follows.", e); } catch (Throwable t) { // caught because the caller does not handle or log Throwables LOGGER.error("Unhandled error", t); } } } } // PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件 public void start() { LOGGER.info("Configuration provider starting"); Preconditions.checkState(file != null, "The parameter file must not be null"); executorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d") .build()); FileWatcherRunnable fileWatcherRunnable = new FileWatcherRunnable(file, counterGroup); executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval, TimeUnit.SECONDS); lifecycleState = LifecycleState.START; LOGGER.debug("Configuration provider started"); }
对于第二点,利用Guava EventBus提供的发布订阅模式机制,将配置修改封装成事件传递给Application,来重新加载配置
// FileWatcherRunnable.run方法 发布配置修改的事件 eventBus.post(getConfiguration()); // Application.main方法来注册事件订阅 Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus"); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } // Application类采用@Subscribe标注来定义订阅方法,即配置修改后会执行handleConfigurationEvent方法,这个方法是线程安全的 @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); }
相关文章推荐
- 霍夫的图像处理变换(线检测算法)
- 含有ref out 参数 的方法反射 Emit 与 普通
- 产品知识管理:最笨的方法或许也是最好的
- java包装类
- 在jquery的ajax中添加自定义的header信息
- 必备的SAT写作高频固定搭配总结
- (ML foundation 7)VC dimension & Model complexity
- 六 100万并发连接服务器笔记之Erlang完成1M并发连接目标
- android学习收藏的一些网址,自用~
- linux shell 自定义函数(定义、返回值、变量作用域)介绍
- win7下安装及验证jdk
- (ML Foundation 8)Error measure
- C# using
- Android实现语音识别代码
- android 背景图片的变形(background)
- 项目管理系列分享(六)——最佳实践-需求
- 三 100万并发连接服务器笔记之测试端就绪
- 四 100万并发连接服务器笔记之1M并发连接目标达成
- linux shell 流程控制(条件if,循环【for,while】,选择【case】语句实例
- iOS AVCaptureVideoDataOutputSampleBufferDelegate 录制视频