您的位置:首页 > Web前端 > JavaScript

JStorm之Woker启动流程

2015-03-25 18:51 246 查看
我们知道,在Jstorm中具体任务执行是由Woker来完成的,Woker的启动是由Supervisor组件负责,下面就介绍一个Woker的启动流程。

Sueprvisor在启动后会启动一个检测任务分配的线程,该线程周期性的到Zookeeper中查询任务分配的情况,一旦有新的任务到来,就会把自己负责的Woker启动起来,当然其中具体细节还有很多,大概流程就是这样的。

线程的执行体定义在EventManagerImpExecute中,如下:
public void run() {
try {
while (manager.isRunning()) {
RunnableCallback r = null;
try {
r = manager.take();
} catch (InterruptedException e) {
// LOG.info("Failed to get ArgsRunable from EventManager queue");
}

if (r == null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {

}
continue;
}

r.run();
Exception e = r.error();
if (e != null) {
throw e;
}
manager.proccessinc();

}

} catch (InterruptedException e) {
.....
}
}

可以看到该线程逻辑是基于事件队列的,最多10秒就会查询一次ZK,如果有事件到来,就会执行该事件的run方法,下面我们看下当一个任务到来时,Supervisor做了哪些操作,代码在SyncSupervisorEvent
public void run() {
LOG.debug("Synchronizing supervisor");

try {

RunnableCallback syncCallback = new EventManagerZkPusher(this,
syncSupEventManager);

/**
* Step 1: get all assignments and register /ZK-dir/assignment and
* every assignment watch
*
*/
Map<String, Assignment> assignments = Cluster.get_all_assignment(
stormClusterState, syncCallback);
LOG.debug("Get all assignments " + assignments);

/**
* Step 2: get topologyIds list from
* STORM-LOCAL-DIR/supervisor/stormdist/
*/
List<String> downloadedTopologyIds = StormConfig
.get_supervisor_toplogy_list(conf);
LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);

/**
* Step 3: get <port,LocalAssignments> from ZK local node's
* assignment
*/
Map<Integer, LocalAssignment> localAssignment = getLocalAssign(
stormClusterState, supervisorId, assignments);

/**
* Step 4: writer local assignment to LocalState
*/
try {
LOG.debug("Writing local assignment " + localAssignment);
localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment);
} catch (IOException e) {
LOG.error("put LS_LOCAL_ASSIGNMENTS " + localAssignment
+ " of localState failed");
throw e;
}

// Step 5: download code from ZK

Map<String, String> topologyCodes = getTopologyCodeLocations(
assignments, supervisorId);

downloadTopology(topologyCodes, downloadedTopologyIds);

/**
* Step 6: remove any downloaded useless topology
*/
removeUselessTopology(topologyCodes, downloadedTopologyIds);

/**
* Step 7: push syncProcesses Event
*/
processEventManager.add(syncProcesses);

} catch (Exception e) {
LOG.error("Failed to Sync Supervisor", e);
// throw new RuntimeException(e);
}
}
上面的代码中注释写的很清楚了,主要是重新注册ZK的watch和下载代码。然后把SyncProcessEvent事件丢入相应队列,然后由SyncProcessEvent线程处理该事件,也就是启动相应Worker。下面我们看这部分位于SyncProcessEvent的启动流程:
public void run() {
LOG.debug("Syncing processes");
try {

/**
* Step 1: get assigned tasks from localstat Map<port(type Integer),
* LocalAssignment>
*/
Map<Integer, LocalAssignment> localAssignments = null;
try {
localAssignments = (Map<Integer, LocalAssignment>) localState
.get(Common.LS_LOCAL_ASSIGNMENTS);
} catch (IOException e) {
LOG.error("Failed to get LOCAL_ASSIGNMENTS from LocalState", e);
throw e;
}

if (localAssignments == null) {
localAssignments = new HashMap<Integer, LocalAssignment>();
}
LOG.debug("Assigned tasks: " + localAssignments);

/**
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat
* Map<workerid [WorkerHeartbeat, state]>
*/
Map<String, StateHeartbeat> localWorkerStats = null;
try {
localWorkerStats = getLocalWorkerStats(conf, localState,
localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats);

/**
* Step 3: kill Invalid Workers and remove killed worker from
* localWorkerStats
*/
Set<Integer> keepPorts = killUselessWorkers(localWorkerStats);

// check new workers
checkNewWorkers(conf);

// start new workers
startNewWorkers(keepPorts, localAssignments);

} catch (Exception e) {
LOG.error("Failed Sync Process", e);
// throw e
}
}
上述代码主要完成两件事1、删除旧的无效Woker 2、启动新Worker,上述代码中最终会调用launchWorker来启动一个Woker进程,launchWorker的主要作用就是拼装Woker启动命令,在最后调用JStormUtils.launch_process来启动该进程
public void launchWorker(Map conf, IContext sharedcontext,
String topologyId, String supervisorId, Integer port,
String workerId, LocalAssignment assignment) throws IOException {

// STORM-LOCAL-DIR/supervisor/stormdist/topologyId
String stormroot = StormConfig.supervisor_stormdist_root(conf,
topologyId);

// STORM-LOCAL-DIR/supervisor/stormdist/topologyId/stormjar.jar
String stormjar = StormConfig.stormjar_path(stormroot);

// get supervisor conf
Map stormConf = StormConfig.read_supervisor_topology_conf(conf,
topologyId);

Map totalConf = new HashMap();
totalConf.putAll(conf);
totalConf.putAll(stormConf);

// get classpath
// String[] param = new String[1];
// param[0] = stormjar;
// String classpath = JStormUtils.add_to_classpath(
// JStormUtils.current_classpath(), param);

// get child process parameter

String stormhome = System.getProperty("jstorm.home");

long memSize = assignment.getMem();
int cpuNum = assignment.getCpu();
String childopts = getChildOpts(totalConf);

childopts += getGcDumpParam(totalConf);

Map<String, String> environment = new HashMap<String, String>();

if (ConfigExtension.getWorkerRedirectOutput(totalConf)) {
environment.put("REDIRECT", "true");
} else {
environment.put("REDIRECT", "false");
}

environment.put("LD_LIBRARY_PATH",
(String) totalConf.get(Config.JAVA_LIBRARY_PATH));

StringBuilder commandSB = new StringBuilder();

try {
if (this.cgroupManager != null) {
commandSB
.append(cgroupManager.startNewWorker(cpuNum, workerId));
}
} catch (Exception e) {
LOG.error("fail to prepare cgroup to workerId: " + workerId, e);
return;
}

// commandSB.append("java -server -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n ");
commandSB.append("java -server ");
commandSB.append(" -Xms" + memSize);
commandSB.append(" -Xmx" + memSize + " ");
commandSB.append(" -Xmn" + memSize / 3 + " ");
commandSB.append(" -XX:PermSize=" + memSize / 16);
commandSB.append(" -XX:MaxPermSize=" + memSize / 8);
commandSB.append(" " + childopts);
commandSB.append(" "
+ (assignment.getJvm() == null ? "" : assignment.getJvm()));

commandSB.append(" -Djava.library.path=");
commandSB.append((String) totalConf.get(Config.JAVA_LIBRARY_PATH));

if (stormhome != null) {
commandSB.append(" -Djstorm.home=");
commandSB.append(stormhome);
}

commandSB.append(getLogParameter(totalConf, stormhome, assignment.getTopologyName(), port));

String classpath = getClassPath(stormjar, stormhome, totalConf);
String workerClassPath = (String) totalConf
.get(Config.WORKER_CLASSPATH);
List<String> otherLibs = (List<String>) stormConf
.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
StringBuilder sb = new StringBuilder();
if (otherLibs != null) {
for (String libName : otherLibs) {
sb.append(StormConfig.stormlib_path(stormroot, libName))
.append(":");
}
}
workerClassPath = workerClassPath + ":" + sb.toString();

Map<String, String> policyReplaceMap = new HashMap<String, String>();
String realClassPath = classpath + ":" + workerClassPath;
policyReplaceMap.put(SandBoxMaker.CLASS_PATH_KEY, realClassPath);
commandSB
.append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap));

// commandSB.append(" -Dlog4j.configuration=storm.log.properties");

commandSB.append(" -cp ");
// commandSB.append(workerClassPath + ":");
commandSB.append(classpath);
if (!ConfigExtension.isEnableTopologyClassLoader(totalConf))
commandSB.append(":").append(workerClassPath);

commandSB.append(" com.alibaba.jstorm.daemon.worker.Worker ");
commandSB.append(topologyId);

commandSB.append(" ");
commandSB.append(supervisorId);

commandSB.append(" ");
commandSB.append(port);

commandSB.append(" ");
commandSB.append(workerId);

commandSB.append(" ");
commandSB.append(workerClassPath + ":" + stormjar);

String cmd = commandSB.toString();
cmd = cmd.replace("%ID%", port.toString());
cmd = cmd.replace("%TOPOLOGYID%", topologyId);
if (stormhome != null) {
cmd = cmd.replace("%JSTORM_HOME%", stormhome);
} else {
cmd = cmd.replace("%JSTORM_HOME%", "./");
}

LOG.info("Launching worker with command: " + cmd);
LOG.info("Environment:" + environment.toString());

JStormUtils.launch_process(cmd, environment, true);
}

上面代码描述了构建Woker启动命令时的各种细节,包括内存、libpath、GC策略等信息,命令拼装好后就到了启动阶段,Woker是一个后台启动的进程
public static java.lang.Process launch_process(final String command,
final Map<String, String> environment, boolean backend) throws IOException {

if (backend == true) {
new Thread(new Runnable() {

@Override
public void run() {
String[] cmdlist = (new String("nohup " + command + " &")).split(" ");
try {
launchProcess(cmdlist, environment);
} catch (IOException e) {
LOG.error("Failed to run " + command + ":" + e.getCause(), e);
}
}
}).start();
return null;
}else {
String[] cmdlist = command.split(" ");
return launchProcess(cmdlist, environment);
}
}
这里会单独启动一个线程来启动Woker进程,最后由ProcessBuilder来启动
protected static java.lang.Process launchProcess(final String[] cmdlist,
final Map<String, String> environment)  throws IOException {
ArrayList<String> buff = new ArrayList<String>();
for (String tok : cmdlist) {
if (!tok.isEmpty()) {
buff.add(tok);
}
}

ProcessBuilder builder = new ProcessBuilder(buff);
builder.redirectErrorStream(true);
Map<String, String> process_evn = builder.environment();
for (Entry<String, String> entry : environment.entrySet()) {
process_evn.put(entry.getKey(), entry.getValue());
}

return builder.start();
}

至此一个Woker启动就完成了,在系统通过ps命令就可以看到,上述流程中我们可以发现,Worker启动并没生成启动脚本,这点和Hadoop中是不同的,在Hadoop中Map和Reduce的启动都会生成一个脚本,有时排查问题会更方便。

最后附上相关线程图片:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  实时计算 云计算