您的位置:首页 > Web前端 > JavaScript

JStorm之Supervisor启动流程

2015-01-14 16:18 543 查看
  Supervisor中文翻译是监督者,意思简单明了,就是对资源进行监控,其实主要是woker资源。该组件所做的事情概括如下:

    1、每隔一段时间发送心跳证明自己还活着

    2、下载新的topology

    3、释放无效的woker

    4、分配新的任务



  该组件主要包含:心跳线程、supervisor事件接受线程、处理线程,一旦事件接受到则会进入任务分配环节,主要逻辑代码如下:
public static void main(String[] args) {
Supervisor instance = new Supervisor();
//主要初始化操作方法
instance.run();
}
  
public void run() {
SupervisorManger supervisorManager = null;
try {
//读取配置文件,和nimbus一样,不多解释
Map<Object, Object> conf = Utils.readStormConfig();
//获得集群模式
StormConfig.validate_distributed_mode(conf);
//创建pid文件
createPid(conf);
//supervisor会在该方法中启动
supervisorManager = mkSupervisor(conf, null);
JStormUtils.redirectOutput("/dev/null");

} catch (Exception e) {
LOG.error("Failed to start supervisor\n", e);
System.exit(1);
}
while (supervisorManager.isFinishShutdown() == false) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

}
}
}
下面看组件启动的主方法
public SupervisorManger mkSupervisor(Map conf, IContext sharedContext)
throws Exception {
LOG.info("Starting Supervisor with conf " + conf);
active = new AtomicBoolean(true);
/**
* Step 1: 清理临时文件:$storm.local.dir/supervisor/tmp
*/
String path = StormConfig.supervisorTmpDir(conf);
FileUtils.cleanDirectory(new File(path));
/*
* Step 2: 创建zk连接,并建立一系列目录:[/assignments, /tasks, /topology, /supervisors, /taskbeats, /taskerrors, /monitor]
*
*/
StormClusterState stormClusterState = Cluster
.mk_storm_cluster_state(conf);
/*
* Step 3  创建$storm.local.dirsupervisor/localstate目录
*         创建以时间戳为名的版本文件:1421217778765、1421217778765.version并将supervisorid序列化到文件内
*         删除旧的版本文件,保留最近4个
*/
LocalState localState = StormConfig.supervisorState(conf);
String supervisorId = (String) localState.get(Common.LS_ID);
if (supervisorId == null) {
supervisorId = UUID.randomUUID().toString();
localState.put(Common.LS_ID, supervisorId);
}
Vector<SmartThread> threads = new Vector<SmartThread>();
// Step 5 create HeartBeat
/*
* 创建heatbeat线程,并每隔supervisor.heartbeat.frequency.secs秒发送一次心跳,更新zk里的znode
* znode节点为:/supervisors/supervisor-id (0add54ac-2c23-49bc-aaee-05b3cb9fef00)
* 会更新该节点的如下信息:SupervisorInfo[hostName=rt2l02046.tbc,
*					 										supervisorId=0add54ac-2c23-49bc-aaee-05b3cb9fef00,
*					 										timeSecs=1421219320,
*					 										uptimeSecs=908,
*					 										workerPorts=[6801, 6800, 6803, 6802]]
*
*/
Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId,
active);
hb.update();
AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null,
Thread.MIN_PRIORITY, true);
threads.add(heartbeat);

// Sync heartbeat to Apsara Container
AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf);
if (syncContainerHbThread != null) {
threads.add(syncContainerHbThread);
}
// Step 6 create and start sync Supervisor thread
// every supervisor.monitor.frequency.secs second run SyncSupervisor
/*
*  创建两个同步线程,SyncSupervisorEvent和SyncProcessEvent分别用来接收事件和处理事件,
*  与topology关系比较密切,在topology提交的时候再详细介绍
*/
EventManager processEventManager = new EventManagerImp(false);
ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>();
SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId,
conf, localState, workerThreadPids, sharedContext);

EventManager syncSupEventManager = new EventManagerImp(false);
SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(
supervisorId, conf, processEventManager, syncSupEventManager,
stormClusterState, localState, syncProcessEvent);

int syncFrequence = JStormUtils.parseInt(conf
.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
EventManagerPusher syncSupervisorPusher = new EventManagerPusher(
syncSupEventManager, syncSupervisorEvent, active, syncFrequence);
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(
syncSupervisorPusher);
threads.add(syncSupervisorThread);

//Step 7 start httpserver
int port = ConfigExtension.getSupervisorDeamonHttpserverPort(conf);
Httpserver httpserver = new Httpserver(port, conf);
httpserver.start();

//Step 8 上传监控信息
MetricSendClient client;
if (ConfigExtension.isAlimonitorMetricsPost(conf)) {
client = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR,
AlimonitorClient.DEFAULT_PORT, true);
} else {
client = new MetricSendClient();
}
UploadSupervMetric uploadMetric = new UploadSupervMetric(conf, stormClusterState,
supervisorId, active, 60, client);
AsyncLoopThread uploadMetricThread = new AsyncLoopThread(uploadMetric);
threads.add(uploadMetricThread);

// SupervisorManger which can shutdown all supervisor and workers
return new SupervisorManger(conf, supervisorId, active, threads,
syncSupEventManager, processEventManager, httpserver,
stormClusterState, workerThreadPids);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息