Kafka详解六:Kafka如何通过源码实现监控
2017-07-13 09:15
1571 查看
问题导读:
1.kafka的消费者组的消费偏移存储,kafka支持两个版本?
2.ConsumerOffsetChecker类的作用是什么?
3.Kafka如何通过源码实现监控?
[align=left]一,基本思路介绍[/align]
[align=left]Kafka作为一个好用的且应用很广泛的消息队列,在大数据处理系统中基本是必不可少的。当然,作为缓存消息的消息队列,我们对其进行流量监控及消费滞后告警就显得异常重要了。[/align]
[align=left]读过前面的文章,<Kafka源码系列之源码解析SimpleConsumer的消费过程>和<Kafka源码系列之Consumer高级API性能分析>这两篇文章的兄弟姐妹应该看本篇文章会很简单。实际就是利用SimpleConsumer获取Partition最新的offset,用Zookeeper的工具获取消费者组的各个分区的消费偏移,两者做差就是lagSize。[/align]
[align=left]但是实际kafka的消费者组的消费偏移存储,kafka支持两个版本的:[/align]
[align=left]1,基于Zookeeper。OffsetFetchRequest.CurrentVersion为0。[/align]
[align=left]2,基于kafka自身。OffsetFetchRequest.CurrentVersion为1(默认)。[/align]
[align=left]那么要实现一个消费者消费滞后预警,就要兼容两种方式,那么我们就详细的来介绍这两种方式的实现。[/align]
[align=left] [/align]
[align=left]二,重要工具类[/align]
[align=left]1,ConsumerOffsetChecker[/align]
[align=left]Kafka提供的检查消费者消费偏移,LogEndSize和lagsize的工具。我们实现自己的监控均可以模仿该类实现。本文也仅限于基于该类将实现过程。[/align]
[align=left]2,ZkUtils[/align]
[align=left]Kafka提供的操作Zookeeper的工具类。[/align]
[align=left]3,SimpleConsumer[/align]
[align=left]Kafka消费者实现类。Kafka的副本同步,低级消费者,高级消费者都是基于该类实现从kafka消费消息的。[/align]
[align=left]4,OffsetRequest[/align]
[align=left]消费者去获取分区数据偏移的请求类,对应的请求key是:RequestKeys.OffsetsKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetRequest(request)[/align]
[align=left]5,OffsetFetchRequest[/align]
[align=left]这个是请求某个topic的某个消费组的消费偏移,对应的请求key:RequestKeys.OffsetFetchKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetFetchRequest(request)[/align]
[align=left]6,OffsetManager[/align]
[align=left]偏移管理器。内部维护了一个Scheduler,会定时执行compact,进行偏移的合并。[/align]
[align=left] [/align]
[align=left]三,源代码实现[/align]
[align=left]1,首先是获得消费者的消费偏移[/align]
[align=left]ConsumerOffsetChecker当main方法中首先是获得topic列表[/align]
[Bash shell] 纯文本查看 复制代码
接着是建立到Broker链接,然后从kafka获取消费者偏移
[Bash shell] 纯文本查看 复制代码
假如,获得的偏移信息为空,那么就从Zookeeper获取消费者偏移。
解决获取topic的分区的最大偏移,实际思路是构建simpleConsumer,然后由其 去请求偏移,再跟获取的消费者偏移做差就得到消费者最大偏移。
[Bash shell] 纯文本查看 复制代码
在processPartition中
[Bash shell] 纯文本查看 复制代码
然后做差得到LagSize
[Bash shell] 纯文本查看 复制代码
getConsumer方法中
[Bash shell] 纯文本查看 复制代码
四,总结
该工具类的使用
[Bash shell] 纯文本查看 复制代码
输出结果
Offset是消费者消费到的偏移,logsize是kafka数据的最大偏移,Lag是二者的差。也即
LagSize = LogSize - Offset
得到我们消费组的滞后情况后,我们就可以根据需求(比如,设定滞后多少消息后给出告警),给出相应的告警。
转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=22215&extra=page%3D1&page=1&
1.kafka的消费者组的消费偏移存储,kafka支持两个版本?
2.ConsumerOffsetChecker类的作用是什么?
3.Kafka如何通过源码实现监控?
[align=left]一,基本思路介绍[/align]
[align=left]Kafka作为一个好用的且应用很广泛的消息队列,在大数据处理系统中基本是必不可少的。当然,作为缓存消息的消息队列,我们对其进行流量监控及消费滞后告警就显得异常重要了。[/align]
[align=left]读过前面的文章,<Kafka源码系列之源码解析SimpleConsumer的消费过程>和<Kafka源码系列之Consumer高级API性能分析>这两篇文章的兄弟姐妹应该看本篇文章会很简单。实际就是利用SimpleConsumer获取Partition最新的offset,用Zookeeper的工具获取消费者组的各个分区的消费偏移,两者做差就是lagSize。[/align]
[align=left]但是实际kafka的消费者组的消费偏移存储,kafka支持两个版本的:[/align]
[align=left]1,基于Zookeeper。OffsetFetchRequest.CurrentVersion为0。[/align]
[align=left]2,基于kafka自身。OffsetFetchRequest.CurrentVersion为1(默认)。[/align]
[align=left]那么要实现一个消费者消费滞后预警,就要兼容两种方式,那么我们就详细的来介绍这两种方式的实现。[/align]
[align=left] [/align]
[align=left]二,重要工具类[/align]
[align=left]1,ConsumerOffsetChecker[/align]
[align=left]Kafka提供的检查消费者消费偏移,LogEndSize和lagsize的工具。我们实现自己的监控均可以模仿该类实现。本文也仅限于基于该类将实现过程。[/align]
[align=left]2,ZkUtils[/align]
[align=left]Kafka提供的操作Zookeeper的工具类。[/align]
[align=left]3,SimpleConsumer[/align]
[align=left]Kafka消费者实现类。Kafka的副本同步,低级消费者,高级消费者都是基于该类实现从kafka消费消息的。[/align]
[align=left]4,OffsetRequest[/align]
[align=left]消费者去获取分区数据偏移的请求类,对应的请求key是:RequestKeys.OffsetsKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetRequest(request)[/align]
[align=left]5,OffsetFetchRequest[/align]
[align=left]这个是请求某个topic的某个消费组的消费偏移,对应的请求key:RequestKeys.OffsetFetchKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetFetchRequest(request)[/align]
[align=left]6,OffsetManager[/align]
[align=left]偏移管理器。内部维护了一个Scheduler,会定时执行compact,进行偏移的合并。[/align]
[align=left] [/align]
[align=left]三,源代码实现[/align]
[align=left]1,首先是获得消费者的消费偏移[/align]
[align=left]ConsumerOffsetChecker当main方法中首先是获得topic列表[/align]
[Bash shell] 纯文本查看 复制代码
[Bash shell] 纯文本查看 复制代码
解决获取topic的分区的最大偏移,实际思路是构建simpleConsumer,然后由其 去请求偏移,再跟获取的消费者偏移做差就得到消费者最大偏移。
[Bash shell] 纯文本查看 复制代码
[Bash shell] 纯文本查看 复制代码
[Bash shell] 纯文本查看 复制代码
[Bash shell] 纯文本查看 复制代码
该工具类的使用
[Bash shell] 纯文本查看 复制代码
Offset是消费者消费到的偏移,logsize是kafka数据的最大偏移,Lag是二者的差。也即
LagSize = LogSize - Offset
得到我们消费组的滞后情况后,我们就可以根据需求(比如,设定滞后多少消息后给出告警),给出相应的告警。
转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=22215&extra=page%3D1&page=1&
相关文章推荐
- 酷炫开源项目cardsui-for-android-超详细源码分析,详解所用特效是如何实现的
- 如何实现通过汉字的拼音或首拼快速检索(含部分源码)
- Git学习-->如何通过Shell脚本实现 监控Gitlab备份整个过程并且通过邮件通知得到备份结果?
- 通过tomcat源码查看其如何实现应用相互隔离
- 详解通过源码解析Node.js中cluster模块的主要功能实现
- 一文详解如何用 TensorFlow 实现基于 LSTM 的文本分类(附源码)
- 如何实现通过汉字的拼音或首拼快速检索(含部分源码)
- 如何通过非数字与字符的方式实现PHP WebShell详解
- 如何使用海康SDK实现异步登录网络摄像机(IPC)【源码】【监控】【录播】【NVR】
- 如何使用海康SDK实现网络摄像机(IPC)自动配置【源码】【监控】【录播】【NVR】
- c++ 如何监控本机共享文件夹变化(通过监控注册表来实现)
- 如何利用UDP组播实现海康网络摄像机(IPC)的自动探测【源码】【监控】【录播】【NVR】【ONVIF】
- sparkstreaming中通过kafka sample api实现directstream源码分析
- [原创]java WEB学习笔记55:Struts2学习之路---详解struts2 中 Action,如何访问web 资源,解耦方式(使用 ActionContext,实现 XxxAware 接口),耦合方式(通过ServletActionContext,通过实现 ServletRequestAware, ServletContextAware 等接口的方式)
- 如何实现通过汉字的拼音或首拼快速检索(含部分源码)
- 如何通过cl_dd_document来实现在ALV中输出标题头
- 详解如何实现最基本的AJAX框架
- 如何通过DataRelation关联两个DataGrid,实现主从表。
- 如何通过按钮实现保存页面?
- 如何实现文本框焦点自动跳转及通过回车键提交表单[引用]