您的位置:首页 > Web前端 > JavaScript

JStorm与Storm源码分析(一)--nimbus-data

2017-07-25 21:32 337 查看
Nimbus里定义了一些共享数据结构,比如nimbus-data.

nimbus-data结构里定义了很多公用的数据,请看下面代码:

(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:inimbus inimbus
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf)
:submit-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
}))


上述代码中:

submitted-count表示当前已经提交的Topology的数目;

定义了storm-cluster-state对象,该对象可用于将数据存储到ZooKeeper中以及从ZooKeeper读取数据;

submit-lock表示一个锁对象;

定义了downloaders和uploaders缓存;

注:当用户提交Topology时,系统会创建一个上传流并将其放入uploaders缓存中;当Supervisor从Nimbus下载Topology的jar时,系统会创建一个下载流并将其放入downloaders缓存中。任何一种操作完成时,其所对应的上传或下载流就会被关闭,且流所传递的内容也会被从缓存中移除。

uptime定义了当前Nimbus的启动时间;

定义了一个validator,它可用于对Topology进行检测验证.目前使用的是DefaultValidator;

定义了一个timer计时器,并给出了当计数器处理失败时需要调用的方法;

定义了Nimbus所使用的调度器scheduler。

下面我们看一下JStorm中是如何描述nimbus-data的.

在JStorm中定义了一个NimbusData类数据结构来存放数据.NimbusData定义如下:

public class NimbusData {
private Map<Object, Object> conf;
//定义当前已经提交的Topology的数目
private AtomicInteger submittedCount;
//定义了cluster-state对象,该对象可用于将数据存储到Zookeeper中以及从Zookeeper读取数据
private StormClusterState stormClusterState;

// Map<topologyId, Map<taskid, TkHbCacheTime>>
private ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>> taskHeartbeatsCache;

// TODO two kind of value:Channel/BufferFileInputStream
//定义了downloades和uploaders缓存,当用户提交Topology的时候,系统会创建一个上传流放入uploaders缓存中,
//而当Supervisor从Nimbus下载Topology的jar包时,系统则会创建一个下载流并将其放入downloaders缓存中。
//任何一种操作完成时,其所对应的上传或下载流就会被关闭,且流所传递的内容也会被从缓存中移除。
private TimeCacheMap<Object, Object> downloaders;
private TimeCacheMap<Object, Object> uploaders;
// cache thrift response to avoid scan zk too frequently
private NimbusCache cache;

private int startTime;

//定义了Nimbus所使用的调度器
private final ScheduledExecutorService scheduExec;

private StatusTransition statusTransition;

private static final int SCHEDULE_THREAD_NUM = 8;

private final INimbus inimubs;
//本地模式
private final boolean localMode;
//当前Nimubus的启动时间
public int uptime() {
return (TimeUtils.current_time_secs() - startTime);
}


注:学习李明老师Storm源码分析的笔记。

欢迎关注下面二维码进行技术交流:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: