分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
2017-01-12 18:34
525 查看
我们知道,Kafka是通过ZK的临时节点来监测Broker的死亡的。当一个Broker挂了之后,ZK上面对应的临时节点被删除,同时其他Broker收到通知。
那么在RocketMQ中,对应的NameServer是如何判断一个Broker的死亡呢?
这里的ChannelEventListener是RocketMQ封装Netty向外暴露的一个接口层。
NameServer收到RegisterBroker信息,更新自己的brokerLiveTable结构
然后NameServer会每10s,扫描一次这个结构。如果发现上次更新时间距离当前时间超过了BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(2分钟),则认为此broker死亡。
而是Producer/Consumer周期性的去NameSrv取。
这里的pollNameServerInteval默认是30s。这也就是意味着,默认情况下,当某个Broker挂了之后,Client需要30s的延迟才会得知此消息。
那么在RocketMQ中,对应的NameServer是如何判断一个Broker的死亡呢?
NameSrv监测Broker的死亡
机制之一:监测连接断掉
当Broker和NameSrv之间的长连接断掉之后,下面的ChannelEventListener里面的函数就会被回调,从而触发NameServer的路由信息更新。这里的ChannelEventListener是RocketMQ封装Netty向外暴露的一个接口层。
//NameSrv里面的BrokerHouseKeepingService public class BrokerHousekeepingService implements ChannelEventListener { 。。。 @Override public void onChannelClose(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelException(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } @Override public void onChannelIdle(String remoteAddr, Channel channel) { this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel); } }
机制之二:心跳
首先,每个Broker会每隔30s向NameSrv更新自身topic信息//BrokerController.start() this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
NameServer收到RegisterBroker信息,更新自己的brokerLiveTable结构
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
然后NameServer会每10s,扫描一次这个结构。如果发现上次更新时间距离当前时间超过了BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2(2分钟),则认为此broker死亡。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS);
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //上1次更新时间距离当前时间小于2分钟 RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
Producer/Consumer如何得知Broker死亡
当某个Broker死亡之后,NameSrv并不会主动通知Producer和Consumer。而是Producer/Consumer周期性的去NameSrv取。
//MQClientInstance的startScheduledTask()函数代码 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
这里的pollNameServerInteval默认是30s。这也就是意味着,默认情况下,当某个Broker挂了之后,Client需要30s的延迟才会得知此消息。
private int pollNameServerInteval = 1000 * 30;
相关文章推荐
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- hadoop之MapReduce框架JobTracker端心跳机制分析(源码分析第七篇) 推荐
- Apache Kafka源码分析 – Broker Server
- Hadoop源码分析之一(RPC机制之Server)
- 分布式消息队列RocketMQ源码分析之2 -- Broker与NameServer心跳机制
- hadoop之MapReduce框架TaskTracker端心跳机制分析(源码分析第六篇) 推荐
- Ambari-server源码分析:核心类-心跳处理agent.HeartBeatHandler
- HDFS源码分析(五)-----节点注册与心跳机制
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)
- Hadoop源码分析之心跳机制
- 分布式消息队列RocketMQ源码分析之3 -- Consumer负载均衡机制 -- Rebalance
- Hadoop源码分析之心跳机制
- 源码分析RocketMQ消息过滤机制下篇-FilterServer、ClassFilter模式详解
- HBase1.2.3版本memstore flush触发机制以及HRegionServer级别触发源码分析
- Hadoop源码分析之一(RPC机制之Server)
- HDFS源码分析(五)-----节点注册与心跳机制
- Hadoop源码分析之一(RPC机制之Server)
- Hadoop源码分析之心跳机制
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)
- Android IPC 通讯机制源码分析