kafka的consumerConnector.createMessageStreams 方法源码分析
2017-09-27 23:16
731 查看
Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
这里的createMessageStreams调用的是子类ZookeeperConsumerConnector 的实现:
consume方法:
getConsumerThreadIdsPerTopic方法,实际调用的是TopicCount的makeConsumerThreadIdsPerTopic方法:
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
这里的createMessageStreams调用的是子类ZookeeperConsumerConnector 的实现:
def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String, List[KafkaStream[K,V]]] = { if (messageStreamCreated.getAndSet(true)) throw new MessageStreamsExistException(this.getClass.getSimpleName + " can create message streams at most once",null) **consume**(topicCountMap, keyDecoder, valueDecoder) }
consume方法:
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") //这个方法下边分析 val topicCount = TopicCount.**constructTopicCount**(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => {//每个线程一个队列,用阻塞队列构造一个KafkaStream val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList //本来是((queue1,stream1),(queue2,stream1)),((queue3,stream3),(queue4,stream4)),压平成: //((queue1,stream1),(queue2,stream1),(queue3,stream3),(queue4,stream4)) val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
getConsumerThreadIdsPerTopic方法,实际调用的是TopicCount的makeConsumerThreadIdsPerTopic方法:
def makeConsumerThreadIdsPerTopic(consumerIdString: String, topicCountMap: Map[String, Int]) = { val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]() for ((topic, nConsumers) <- topicCountMap) {//循环每个topic和给这个topic配置的线程数 val consumerSet = new mutable.HashSet[ConsumerThreadId] assert(nConsumers >= 1) for (i <- 0 until nConsumers) //为每个线程创建一个线程id consumerSet += ConsumerThreadId(consumerIdString, i) consumerThreadIdsPerTopicMap.put(topic, consumerSet) } consumerThreadIdsPerTopicMap }
相关文章推荐
- Kafka 0.8源码分析—ZookeeperConsumerConnector
- 跟我学Kafka源码之Consumer分析
- Kafka源码分析-序列6 -Consumer -消费策略分析
- nova创建虚拟机源码分析系列之六 api入口create方法
- pomelo源码分析(6)--connector协议处理message
- kafka源码分析之kafka的consumer的负载均衡管理
- apache kafka源码分析走读-ZookeeperConsumerConnector分析
- Kafka源码深度解析-序列6 -Consumer -消费策略分析
- 分布式消息队列RocketMQ源码分析之4 -- Consumer负载均衡与Kafka的Consumer负载均衡之不同点
- flink-connector-kafka consumer的topic分区分配源码
- Kafka源码分析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- kafka源码分析之consumer的源码
- Kafka源码深度解析-序列9 -Consumer -SubscriptionState内部结构分析
- Kafka Producer同步模式发送message源码分析
- java再复习——多线程之初识线程,并从源码角度分析start与run方法,Thread类与Runnable接口
- Tomcat源码分析之 doGet方法(二)
- MessageQueue.IdleHandler接口使用方法以及原理分析
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- SpringMVC源码分析(2):分析HandlerAdapter.handle方法,了解handler方法的调用细节以及@ModelAttribute注解
- Android源码分析-深入理解setContentView方法