您的位置:首页 > 其它

kafka源码解析之十三KafkaHealthcheck

2016-04-11 13:41 330 查看
主要是在zk的/brokers/[0...N] 路径上建立该Broker的信息,并且该节点是ZK中的Ephemeral Node,当此Broker离线的时候,zk上对应的节点也就消失了,那么其它Broker可以及时发现该Broker的异常。
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedHost: String,
private val advertisedPort: Int,
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
def startup() {
zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
def shutdown() {
zkClient.unsubscribeStateChanges(sessionExpireListener)
ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
}
def register() {
val advertisedHostName =
if(advertisedHost == null || advertisedHost.trim.isEmpty)
InetAddress.getLocalHost.getCanonicalHostName
else
advertisedHost
val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt
//在/brokers/ids/路径下存储broker的基本消息,例如端口号,ip地址,时间戳等,以上内容均在Ephemeral Node上,只要该broker和zk失去链接,则zk对应目录的内容被清空
ZkUtils.registerBrokerInZk(zkClient, brokerId, advertisedHostName, advertisedPort, zkSessionTimeoutMs, jmxPort)
}
//该SessionExpireListener的作用就是重建broker的节点,防止短暂的和zk失去链接之后,该broker对应的节点也全部丢失了
class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
def handleNewSession() {
info("re-registering broker info in ZK for broker " + brokerId)
register()
info("done re-registering broker")
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
}
}
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: