第37课:Kafka源码解读Consumer内幕解密
2016-07-03 17:04
525 查看
第37课:Kafka源码解读Consumer内幕解密
contributor:spark 2.0 bug 修复
Consumer :while 循环,线程向broker list主动抓数据,线程不断的看有没有数据。不断向leader询问:有数据吗? 一个线程消费一个partition的数据
设想场景:
broker(3)- topic (1)- partition(10)
获取数据,被zookeeper管理,向zookeeper请求关注的topic及partition,zookeeper根据leader
、follower的信息,将关注的topic及partition给你,每一个partition,产生一个线程取抓取,
然后不断循环,每一个partition可能会变,10个partition就有10个线程,抓取数据以后,放入
Consumer 的一个阻塞队列,10个线程不断抓到数据 一直poll,Consumer 用一个线程从阻塞队列
中拿数据,一个record,2个record,。。。。抓数据是并行的;而拿数据需要一个一个线程去拿;
contributor:spark 2.0 bug 修复
Consumer :while 循环,线程向broker list主动抓数据,线程不断的看有没有数据。不断向leader询问:有数据吗? 一个线程消费一个partition的数据
设想场景:
broker(3)- topic (1)- partition(10)
获取数据,被zookeeper管理,向zookeeper请求关注的topic及partition,zookeeper根据leader
、follower的信息,将关注的topic及partition给你,每一个partition,产生一个线程取抓取,
然后不断循环,每一个partition可能会变,10个partition就有10个线程,抓取数据以后,放入
Consumer 的一个阻塞队列,10个线程不断抓到数据 一直poll,Consumer 用一个线程从阻塞队列
中拿数据,一个record,2个record,。。。。抓数据是并行的;而拿数据需要一个一个线程去拿;
private[kafka] object ZookeeperConsumerConnector { val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L) }
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
相关文章推荐
- Google protocol buffer 使用和原理浅析 And 进阶使用方式之PbCodec
- 检测和跟踪的介绍
- centos7关闭防火墙
- 递归
- fixed,background......兼容问题
- Mybatis步步进阶(七)——Mybatis实体关联映射
- 前后端分离的思考与实践(三)
- 安卓实战开发之CardView的selector及GrideView的item按下状态保留selector(state_activated)的实现
- Android开发实用开源工具
- 从Java代码实现角度探讨CSRF(未完待续)
- 下载centos内核源代码
- 【NOI2014】魔法森林
- 读书笔记-java网络编程-3线程-同步
- dfs:Tempter of the Bone剪枝
- Learn sed using these command on Linux(流线式编辑器——sed)
- 【超级宝典(第5版)】第二章:第一个三角形
- 前后端分离的思考与实践(二)
- JAVA学习总结二十二
- Leetcode Max Sum of Rectangle No Larger Than K
- redis源码解析1-简单动态字符串