您的位置:首页 > Web前端 > JavaScript

jstorm源码分析: nimbus

2016-09-25 22:57 351 查看
nimbus

topologyAssign
准备工作

分配worker和task

信息写入

nimbus

里面有很重要的成员的topologyAssign

topologyAssign

这个类实现了Runnable, 在里面通过创建thread并通过start()启动了线程

public class TopologyAssign implements Runnable {


那么我们可以直接看他的run函数

public void run() {
LOG.info("TopologyAssign thread has been started");
runFlag = true;

while (runFlag) {
TopologyAssignEvent event;
try {
event = queue.take();
} catch (InterruptedException e1) {
continue;
}
if (event == null) {
continue;
}

boolean isSuccess = doTopologyAssignment(event);

if (isSuccess == false) {
} else {
try {
cleanupDisappearedTopology();
} catch (Exception e) {
LOG.error("Failed to do cleanup disappear topology ", e);
continue;
}
}
}

}


从代码来看,看了三个事情:

1 是通过从队列中获取任务信息

2 根据任务信息分配任务

3 清除老的任务

里面最主要的是分配任务,我们详细来看下

protected boolean doTopologyAssignment(TopologyAssignEvent event) {
Assignment assignment;
try {
Assignment oldAssignment = null;
boolean isReassign = event.isScratch();
if (isReassign) {
oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
}
assignment = mkAssignment(event);

// notify jstorm monitor on task assign/reassign/rebalance
TaskStartEvent taskEvent = new TaskStartEvent();
taskEvent.oldAssignment = oldAssignment;
taskEvent.newAssignment = assignment;
taskEvent.topologyId = event.getTopologyId();
taskEvent.clusterName = nimbusData.getClusterName();
taskEvent.timestamp = System.currentTimeMillis();

Map<Integer, String> task2Component;
// get from nimbus cache first
Map<Integer, TaskInfo> taskInfoMap = Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId());
if (taskInfoMap != null) {
task2Component = Common.getTaskToComponent(taskInfoMap);
} else {
task2Component = Common.getTaskToComponent(Cluster.get_all_taskInfo(nimbusData.getStormClusterState(), event.getTopologyId()));
}
taskEvent.task2Component = task2Component;
nimbusData.getMetricRunnable().pushEvent(taskEvent);

if (!isReassign) {
setTopologyStatus(event);
}
} catch (Throwable e) {
LOG.error("Failed to assign topology " + event.getTopologyId(), e);
event.fail(e.getMessage());
return false;
}

if (assignment != null)
backupAssignment(assignment, event);
event.done();
return true;
}


同样,从代码看最重要的是mkAssignment函数,他主要任务是三个:

1 是为任务的分配做准备工作

2 是真正的分配任务(确定worker运行到那台机器和端口,确认 task运行到哪个worker)

3 把分配的结果写入到zookeeper上

下面针对这三块详细介绍:

准备工作

这个实现在prepareTopologyAssign 函数中

protected TopologyAssignContext prepareTopologyAssign(TopologyAssignEvent event) throws Exception {
//一个任务的上下文场景,下面工作主要是为了填充这个对象的
TopologyAssignContext ret = new TopologyAssignContext();

String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId);     // topologyId

int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId);   //masterId
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

//一个任务的配置有nimbus 配置以及topology配置组成的(这个写到nimbus的nimbus/stormdist/topologyId/stormconf.ser文件里面的,进行序列化的)
Map<Object, Object> nimbusConf = nimbusData.getConf();
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
//topology code(这个写到nimbus的nimbus/stormdist/topologyId/stormcode.ser文件里面的,进行序列化的)
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(nimbusConf, topologyId);
ret.setRawTopology(rawTopology);

Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);

StormClusterState stormClusterState = nimbusData.getStormClusterState();

// get all running supervisor, don't need callback to watch supervisor
Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
//所有的可用work就是全部的work
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
SupervisorInfo supervisor = supInfo.getValue();
if (supervisor != null)
supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//获取所有活着的sp
getAliveSupervsByHb(supInfos, nimbusConf);
if (supInfos.size() == 0) {
throw new FailedAssignTopologyException("Failed to make assignment " + topologyId + ", due to no alive supervisor");
}
//taskId --> componentId对应关系
//从zookeeper上获取的,那么这个信息是谁写入的呢?
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);

// get taskids /ZK/tasks/topologyId
// 通过map 的keyset获取所有的taskIds
Set<Integer> allTaskIds = taskToComponent.keySet();
if (allTaskIds == null || allTaskIds.size() == 0) {
String errMsg = "Failed to get all task ID list from /ZK-dir/tasks/" + topologyId;
LOG.warn(errMsg);
throw new IOException(errMsg);
}
ret.setAllTaskIds(allTaskIds);

Set<Integer> aliveTasks = new HashSet<Integer>();
// unstoppedTasks are tasks which are alive on no supervisor's(dead)
// machine
Set<Integer> unstoppedTasks = new HashSet<Integer>();
Set<Integer> deadTasks = new HashSet<Integer>();
Set<ResourceWorkerSlot> unstoppedWorkers = new HashSet<ResourceWorkerSlot>();

//通过 zk/assignments 目录获取assign信息
Assignment existingAssignment = stormClusterState.assignment_info(topologyId, null);
//如果老任务存在
if (existingAssignment != null) {
aliveTasks = getAliveTasks(topologyId, allTaskIds);   //根据心跳的状态

/*
* Check if the topology master task is alive first since all task
* heartbeat info is reported by topology master.
* If master is dead, do reassignment for topology master first.
*/
//首先看topology master task 是否死了,如果死的,那么需要重启下,否则信息都没法传送了
if (aliveTasks.contains(topoMasterId) == false) {
ResourceWorkerSlot worker = existingAssignment.getWorkerByTaskId(topoMasterId);
deadTasks.addAll(worker.getTasks());

//1 所有task里面删除已经死的, 2 所有活着里面加上1,  3 所有活着的排除已经死的
Set<Integer> tempSet = new HashSet<Integer>(allTaskIds);
tempSet.removeAll(deadTasks);
aliveTasks.addAll(tempSet);
aliveTasks.removeAll(deadTasks);
} else {  // mastId没有死的话,那么就直接计算死task
deadTasks.addAll(allTaskIds);
deadTasks.removeAll(aliveTasks);
}
//获取活着的task
unstoppedTasks = getUnstoppedSlots(aliveTasks, supInfos, existingAssignment);
}

ret.setDeadTaskIds(deadTasks);
ret.setUnstoppedTaskIds(unstoppedTasks);

// Step 2: get all slots resource, free slots/ alive slots/ unstopped
// slots
getFreeSlots(supInfos, stormClusterState);
ret.setCluster(supInfos);

if (existingAssignment == null) {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_NEW);

try {
AssignmentBak lastAssignment = stormClusterState.assignment_bak(event.getTopologyName());
if (lastAssignment != null) {
ret.setOldAssignment(lastAssignment.getAssignment());
}
} catch (Exception e) {
LOG.warn("Fail to get old assignment", e);
}
} else {
ret.setOldAssignment(existingAssignment);
if (event.isScratch()) {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_REBALANCE);
ret.setIsReassign(event.isReassign());
unstoppedWorkers = getUnstoppedWorkers(unstoppedTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
} else {
ret.setAssignType(TopologyAssignContext.ASSIGN_TYPE_MONITOR);
unstoppedWorkers = getUnstoppedWorkers(aliveTasks, existingAssignment);
ret.setUnstoppedWorkers(unstoppedWorkers);
}
}
return ret;
}


代码有点长,但其实就干一个事情,就是创建TopologyAssignContext,也就是拓扑任务的上下文场景并进行赋值。

分配worker和task

默认情况下,采用DefaultTopologyScheduler进行调度 ,具体实现在assignTasks函数中,核心代码就三行:

List<ResourceWorkerSlot> availableWorkers = WorkerScheduler.getInstance().getAvailableWorkers(defaultContext, needAssignTasks, allocWorkerNum);
TaskScheduler taskScheduler = new TaskScheduler(defaultContext, needAssignTasks, availableWorkers);
Set<ResourceWorkerSlot> assignment = new HashSet<ResourceWorkerSlot>(taskScheduler.assign());


work的分配比较简单,就是所有机器尽量的均衡,具体过程:

1 根据总的work需求量和机器数,计算每台机器需要运行的work 数量

2 对于每台机器,所有少于平均,那么就在机器上分配work直到达到平均个数

3 对于剩余未分配的work,继续在这些有空闲work的机器上一个个分配

详细的可以看下putWorkerToSupervisor 实现

上面把 work分配好了,那么各个task应该运行到哪个worker上呢?

按照及下面几个原则的优先级:

1 同个component的task尽量不要放到同个worker上

2 work上的task运行个数尽量的平均

3 上下直接数据关系的task尽量在不同的worker上(这个是不是我理解有误?)

信息写入

最后把这些分配的相关信息组合在一起,写入到zookeeper上

//获取任务对应机器
Map<String, String> nodeHost = getTopologyNodeHost(context.getCluster(), context.getOldAssignment(), assignments);
//获取task启动时间的启动时间
Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments);
//获取代码的路径
String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId);
//创建一个任务的信息
assignment = new Assignment(codeDir, assignments, nodeHost, startTimes);

//  the topology binary changed.
if (event.isScaleTopology()){
assignment.setAssignmentType(Assignment.AssignmentType.ScaleTopology);
}
StormClusterState stormClusterState = nimbusData.getStormClusterState();
//把任务分配的信息写入zk
stormClusterState.set_assignment(topologyId, assignment);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: