您的位置:首页 > 其它

kafka的consumerConnector.createMessageStreams 方法源码分析

2017-09-27 23:16 731 查看
Consumer.create(consumerConfig)

val topicMessageStreams = consumerConnector.createMessageStreams(

topics, keyDecoder, valueDecoder)

这里的createMessageStreams调用的是子类ZookeeperConsumerConnector 的实现:

def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String, List[KafkaStream[K,V]]] = {
if (messageStreamCreated.getAndSet(true))
throw new MessageStreamsExistException(this.getClass.getSimpleName +
" can create message streams at most once",null)
**consume**(topicCountMap, keyDecoder, valueDecoder)
}


consume方法:

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")

//这个方法下边分析
val topicCount = TopicCount.**constructTopicCount**(consumerIdString, topicCountMap)

val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {//每个线程一个队列,用阻塞队列构造一个KafkaStream
val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList //本来是((queue1,stream1),(queue2,stream1)),((queue3,stream3),(queue4,stream4)),压平成:
//((queue1,stream1),(queue2,stream1),(queue3,stream3),(queue4,stream4))

val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)

loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}


getConsumerThreadIdsPerTopic方法,实际调用的是TopicCount的makeConsumerThreadIdsPerTopic方法:

def makeConsumerThreadIdsPerTopic(consumerIdString: String,
topicCountMap: Map[String,  Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
for ((topic, nConsumers) <- topicCountMap) {//循环每个topic和给这个topic配置的线程数
val consumerSet = new mutable.HashSet[ConsumerThreadId]
assert(nConsumers >= 1)
for (i <- 0 until nConsumers) //为每个线程创建一个线程id
consumerSet += ConsumerThreadId(consumerIdString, i)
consumerThreadIdsPerTopicMap.put(topic, consumerSet)
}
consumerThreadIdsPerTopicMap
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: