您的位置:首页 > 其它

分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制

2017-01-12 18:34 525 查看
我们知道,Kafka是通过ZK的临时节点来监测Broker的死亡的。当一个Broker挂了之后,ZK上面对应的临时节点被删除,同时其他Broker收到通知。

那么在RocketMQ中,对应的NameServer是如何判断一个Broker的死亡呢?

NameSrv监测Broker的死亡

机制之一:监测连接断掉

当Broker和NameSrv之间的长连接断掉之后,下面的ChannelEventListener里面的函数就会被回调,从而触发NameServer的路由信息更新。

这里的ChannelEventListener是RocketMQ封装Netty向外暴露的一个接口层。

//NameSrv里面的BrokerHouseKeepingService
public class BrokerHousekeepingService implements ChannelEventListener {
。。。

@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}

@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}


机制之二:心跳

首先,每个Broker会每隔30s向NameSrv更新自身topic信息

//BrokerController.start()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);


NameServer收到RegisterBroker信息,更新自己的brokerLiveTable结构

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;


然后NameServer会每10s,扫描一次这个结构。如果发现上次更新时间距离当前时间超过了BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(2分钟),则认为此broker死亡。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);


public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {  //上1次更新时间距离当前时间小于2分钟
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}


Producer/Consumer如何得知Broker死亡

当某个Broker死亡之后,NameSrv并不会主动通知Producer和Consumer。

而是Producer/Consumer周期性的去NameSrv取。

//MQClientInstance的startScheduledTask()函数代码

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);


这里的pollNameServerInteval默认是30s。这也就是意味着,默认情况下,当某个Broker挂了之后,Client需要30s的延迟才会得知此消息。

private int pollNameServerInteval = 1000 * 30;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息