拨开kafka 的羊毛衫
2016-07-05 11:22
274 查看
好的, 上篇把 kafka.kafka 干的事情解析了一遍, 什么都看不出来, 是的, 什么都看不出来他干了什么。那么这章来电干货。
在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
在初始化这个类的时候,他做了一件事情。
咱们来看看 KafkaScheduler的实现
看到结果了吧, 其实就是 ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”
初始化完成了, 咱们看看 startup 方法里有些什么猫腻。
首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。
他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。
首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。
在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。
这个 segments 搜有的加在一起就是一个完整的 topic。
然后是按照 logsegment 的start 排个序,做个验证,完事。
接着, 把 各个topic 信息放到内存中之后,开始用
来定时 按照 config.logCleanupIntervalMinutes 配置的分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。
下面到了跟zk交互的阶段
跟zk交互的过程包括创建以下path:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
订阅事件
总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。
下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。
在kafka中, 主要资源的协调,开始运行时在
class KafkaServer(val config: KafkaConfig) extends Logging
这个类中进行的。
在初始化这个类的时候,他做了一件事情。
val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
咱们来看看 KafkaScheduler的实现
private val executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() { def newThread(runnable: Runnable): Thread = { val t = new Thread(runnable, baseThreadName + threadId.getAndIncrement) t.setDaemon(isDaemon) t } }) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
看到结果了吧, 其实就是 ScheduledThreadPoolExecutor, kafka 初始化了一个单线程的 ScheduledThreadPoolExecutor 而且名字叫做 “kafka-logcleaner-”
初始化完成了, 咱们看看 startup 方法里有些什么猫腻。
isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) var needRecovery = true val cleanShutDownFile = new File(new File(config.logDir), CLEAN_SHUTDOWN_FILE) if (cleanShutDownFile.exists) { needRecovery = false cleanShutDownFile.delete }
首先是 shutdown 的判断吧之类的操作, 很巧妙,使用了一个文件来表示运行状态。
logManager = new LogManager(config, scheduler, SystemTime, 1000L * 60 * 60 * config.logRollHours, 1000L * 60 * config.logCleanupIntervalMinutes, 1000L * 60 * 60 * config.logRetentionHours, needRecovery)
他new 了一个叫做, logManager 的东西,是的,了解kafka 的人都知道,kafka 是全磁盘操作,message全放磁盘上,此类用于磁盘io的操作。相当关键,咱们看一下。
for(dir <- subDirs) { if(!dir.isDirectory()) { warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?") } else { info("Loading log '" + dir.getName() + "'") val topic = Utils.getTopicPartition(dir.getName)._1 val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs) val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize) val log = new Log(dir, time, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery) val topicPartion = Utils.getTopicPartition(dir.getName) logs.putIfNotExists(topicPartion._1, new Pool[Int, Log]()) val parts = logs.get(topicPartion._1) parts.put(topicPartion._2, log) } }
首先, 他获取了topic们, 获取一些系统属性,把topic 放到 名叫 logs 的一个 pool 中, new Log 的作用是 加载 目录topic 中的log 信息到内存中。
在log 对象之中,用LogSegment 抽象了 log 的分段,因为 topic 是有 partition 的。
/* The actual segments of the log */ private[log] val segments: SegmentList[LogSegment] = loadSegments()
这个 segments 搜有的加在一起就是一个完整的 topic。
然后是按照 logsegment 的start 排个序,做个验证,完事。
接着, 把 各个topic 信息放到内存中之后,开始用
if(scheduler != null) { info("starting log cleaner every " + logCleanupIntervalMs + " ms") scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs) }
来定时 按照 config.logCleanupIntervalMinutes 配置的分钟做一些事情。做什么事情呢, 清空一下旧的log,按照两种标准清空, 一个是超过一定时间的log,还有一个是超过大小的log。
下面到了跟zk交互的阶段
if(config.enableZookeeper) { kafkaZookeeper = new KafkaZooKeeper(config, this) kafkaZookeeper.startup zkActor = new Actor { def act() { loop { receive { case topic: String => try { kafkaZookeeper.registerTopicInZk(topic) } catch { case e => error(e) // log it and let it go } case StopActor => info("zkActor stopped") exit } } } } zkActor.start }
跟zk交互的过程包括创建以下path:
* /topics/[topic]/[node_id-partition_num]
* /brokers/[0...N] --> host:port
订阅事件
总结一下,磁盘部分在 broker 初始化的时候,加载topic 信息到内存, 定期清理以下log, 跟zk做一些注册,订阅事件。
下回咱们看下,初始化的时候, 是网络连接用的什么神奇的东西。
相关文章推荐
- ProgressDialog使用
- shell 判断 404
- AES对称加密
- 101. Symmetric Tree
- Route Filters
- Routes
- Android中的布局和控件的隐藏和触发显示
- java 内存模型与线程 正文
- AndroidStudio如何打包生成realease版本的arr包,并上传到Nexus搭建的maven仓库,供项目远程依赖(二)
- [转载]DynaActionForm (动态ActionForm)
- Utuntu安装Bugzilla
- 主键约束,唯一约束与默认约束
- JAVA学习笔记two:关键字
- java 内存模型与线程 前传
- 大于、等于、小于等符号转换
- 使用tmodjs预编译模板实例
- C语言的常量
- Redis 3.0配置
- awk 中简单的去重方法
- The difference between '?attr' between '?android:attr'