您的位置:首页 > 其它

Kafka High Level Consumer API in Scala

2015-06-26 14:13 831 查看

本文目的

研究了一下Kafka Produce/Consumer 的API,发现Consumer API的使用并没有那么的straight forward。折腾了2天后,终于摸到了一些门道,这里记录下怎样使用High Level Consumer API来接收并处理一个Topic中的消息。

本文的例子用Scala编写,如果要改成Java的其实很容易。

环境

Kafka为0.8.2.0(CDH版本)

Scala为2.10.4

Kafka中事先创建了一个名为
my-2nd-topic
的Topic,该Topic由2个partition构成,如下:



我们将向该topic中写入一些消息,生成消息的代码如下:

object ProduceKeyedMsg {
def BROKER_LIST = "ecs1:9092,ecs2:9092"
def TOPIC = "my-2nd-topic"

def main(args: Array[String]): Unit = {
println("开始产生消息!")

val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

val producer = new KafkaProducer[String, String](props)

for (i <- 0 to 10) {
val ret: Future[RecordMetadata] = producer.send(new ProducerRecord(TOPIC, "key-" + i, "msg-" + i))
val metadata = ret.get  // 打印出 metadata
println("i=" + i + ",  offset=" + metadata.offset() + ",  partition=" + metadata.partition())
}

producer.close
}
}


Consumer API

Consumer的预期行为:开启两个线程,去并行地读取Topic中的消息。(Consumer的两个线程正好对应着Topic的两个partition)

先给出代码,然后再给分析。

package cn.gridx.kafka.apis.scala.consumer

import java.util.Properties

import kafka.consumer.{ConsumerIterator, KafkaStream, ConsumerConfig, Consumer}
import kafka.message.MessageAndMetadata

import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, HashMap}

import scala.concurrent.{Await, Future, future}
import scala.concurrent.ExecutionContext.Implicits.global

/**
* Created by tao on 6/25/15.
*/
object MultiThreadConsumer {
def ZK_CONN     = "ecs1:2181,ecs2:2181,ecs3:2181"
def GROUP_ID    = "xt-group-1"
def TOPIC       = "my-2nd-topic"

def main(args: Array[String]): Unit = {
println(" 开始了 ")

val connector = Consumer.create(createConfig())

val topicCountMap = new HashMap[String, Int]()
topicCountMap.put(TOPIC, 2) // TOPIC在创建时就指定了它有2个partition

val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]]
= connector.createMessageStreams(topicCountMap)

println("# of streams is " + msgStreams.get(TOPIC).get.size)

// 变量futureIndex用来输出Future的序号
var futureIndex = 0
for (stream <- msgStreams.get(TOPIC).get) {
processSingleStream(futureIndex, stream)
futureIndex = futureIndex+1
}

// 主线程阻塞30秒
Thread.sleep(30000)
/* 注意,这里虽然主线程退出了,但是已经创建的各个Future任务仍在运行(一直在等待接收消息)
* 怎样在主线程里结束各个Future任务呢?
*/
println(" 结束了 ")

}

/**
* 一个Future处理一个stream
* TODO:  还需要一个可以控制Future结束的机制
* @param futureIndex
* @param stream
* @return
*/
def processSingleStream(futureIndex:Int, stream: KafkaStream[Array[Byte], Array[Byte]]): Future[Unit] = future {
val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
while (it.hasNext) {
val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
println("futureNumer->[" + futureIndex + "],  key->[" + new String(data.key) + "],  message->[" + new String(data.message) + "],  partition->[" +
data.partition + "],  offset->[" + data.offset + "]")
}
}

def createConfig(): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", ZK_CONN)
props.put("group.id", GROUP_ID)
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")

new ConsumerConfig(props)
}
}


运行结果

运行Producer



运行Consumer



剖析代码

val connector = Consumer.create(createConfig())


创建一个
ConsumerConnector
,这是Consumer的最主要的interface,通过它可以与Kafka cluster进行交互。

val topicCountMap = new HashMap[String, Int]()
topicCountMap.put(TOPIC, 2)


topicCountMap
告诉Kafka我们在Consumer中将用多少个线程来消费该topic。

topicCountMap
的key是topic name,value针对该topic是线程的数量。

val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]]
= connector.createMessageStreams(topicCountMap)


createMessageStreams
根据提供的topicCountMap来创建对应的
KafkaSteam
,每一个
KafkaStream
代表了来自若干个partition(可位于一个或者多个server上)的消息流,每一个
KafkaStream
对应着Consumer中的一个线程。一个stream中的消息可以来自于多个partitions,但是一个partition中的消息只能前往一个stream。

println("# of streams is " + msgStreams.get(TOPIC).get.size)


打印出对应该topic的
KafkaStream
的数量,这里是2。

注意,由于
msgStreams.get(TOPIC)
返回的结果类型是
Option[List[KafkaStream[Array[Byte], Array[Byte]]]]
,因此需要调用
get
来获得其中真正的内容,否则
msgStreams.get(TOPIC).size
的结果永远是1。

var futureIndex = 0
for (stream <- msgStreams.get(TOPIC).get) {
processSingleStream(futureIndex, stream)
futureIndex = futureIndex+1
}

Thread.sleep(30000)


遍历
msgStreams.get(TOPIC).get
返回的
List[KafkaStream[Array[Byte], Array[Byte]]]
,让每一个
KafkaStream
由一个
Future
任务去处理(等待消息的到来、获取消息、处理消息、打印出有关的信息)。这里的futureIndex是为了记录哪个
Future
处理了哪个
KafkaStream


最后,主线程等待30秒后退出(
Future
与主线程异步执行)。

def processSingleStream(futureIndex:Int, stream: KafkaStream[Array[Byte], Array[Byte]]) = future {
val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator()
while (it.hasNext) {
val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
println("futureNumer->[" + futureIndex + "],  key->[" + new String(data.key) + "],  message->[" + new String(data.message) + "],  partition->[" +
data.partition + "],  offset->[" + data.offset + "]")
}
}


KafkaStream
实际上是一个遍历
MessageAndMetadata
对象的迭代器,从
MessageAndMetadata
可以获得message内容以及相关的元数据,例如,
key
是该消息的key,
message
是该消息的message内容,
partition
是该消息在topic的哪一个partition,
offset
则是该消息在对应partition中的offset。

当没有消息的时候,
it.hasNext
会阻塞,当有一条消息来的时候,
it.hasNext
会返回true,然后对该消息进行处理。

def createConfig(): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", ZK_CONN)
props.put("group.id", GROUP_ID)
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")

new ConsumerConfig(props)
}


Consumer所需的配置参数,具体含义如下:

1.
zookeeper.connect
: Kafka uses ZK to store offset of messages consumed for specific topic and partition by this Consumer Group

2.
group.id
: this string defines the Consumer Group this process is consuming on behalf of

3.
zookeeper.session.timeout.ms
: how many milliseconds Kafka will wait for ZK to respond to a read or write request before giving up and continuing to consume messages

4.
zookeeper.sync.time.ms
: the number of milliseconds a ZK follower can be behind the master before an error occurs

5.
auto.commit.interval.ms
: how often updates to the consumed offsets are written to ZK. Since the commit frequency is time based instead of #messages consumed, if an error occurs between updates to ZK on restart you will get replayed messages

解读运行结果

再回过头来看看Consumer的运行结果



看看
futureNumber
partition
的关系:对于被消费的一个message而言,
futureNumber
实际上是Consumer线程的编号(共有2个线程),
partition
是该message所在partition的编号。从图中可以看出,它们是一一对应的。当然,在顺序上也可能是交叉的。这也验证了 一个stream中的消息可以来自于多个partitions,但是一个partition中的消息只能前往一个stream。

最后,看黄色部分。虽然主线程退出了,但是两个
Future
依然在继续等待新的消息到来,并没有随着主线程一起退出。所以最后不得不用
Ctrl + Z
来结束这个进程。所以,在实际的应用中,还需要考虑在必要时怎样停止Consumer。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: