Kafka High Level Consumer API in Scala
2015-06-26 14:13
831 查看
本文目的
研究了一下Kafka Produce/Consumer 的API,发现Consumer API的使用并没有那么的straight forward。折腾了2天后,终于摸到了一些门道,这里记录下怎样使用High Level Consumer API来接收并处理一个Topic中的消息。本文的例子用Scala编写,如果要改成Java的其实很容易。
环境
Kafka为0.8.2.0(CDH版本)Scala为2.10.4
Kafka中事先创建了一个名为
my-2nd-topic的Topic,该Topic由2个partition构成,如下:
我们将向该topic中写入一些消息,生成消息的代码如下:
object ProduceKeyedMsg { def BROKER_LIST = "ecs1:9092,ecs2:9092" def TOPIC = "my-2nd-topic" def main(args: Array[String]): Unit = { println("开始产生消息!") val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) val producer = new KafkaProducer[String, String](props) for (i <- 0 to 10) { val ret: Future[RecordMetadata] = producer.send(new ProducerRecord(TOPIC, "key-" + i, "msg-" + i)) val metadata = ret.get // 打印出 metadata println("i=" + i + ", offset=" + metadata.offset() + ", partition=" + metadata.partition()) } producer.close } }
Consumer API
Consumer的预期行为:开启两个线程,去并行地读取Topic中的消息。(Consumer的两个线程正好对应着Topic的两个partition)先给出代码,然后再给分析。
package cn.gridx.kafka.apis.scala.consumer import java.util.Properties import kafka.consumer.{ConsumerIterator, KafkaStream, ConsumerConfig, Consumer} import kafka.message.MessageAndMetadata import scala.collection.Map import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, Future, future} import scala.concurrent.ExecutionContext.Implicits.global /** * Created by tao on 6/25/15. */ object MultiThreadConsumer { def ZK_CONN = "ecs1:2181,ecs2:2181,ecs3:2181" def GROUP_ID = "xt-group-1" def TOPIC = "my-2nd-topic" def main(args: Array[String]): Unit = { println(" 开始了 ") val connector = Consumer.create(createConfig()) val topicCountMap = new HashMap[String, Int]() topicCountMap.put(TOPIC, 2) // TOPIC在创建时就指定了它有2个partition val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connector.createMessageStreams(topicCountMap) println("# of streams is " + msgStreams.get(TOPIC).get.size) // 变量futureIndex用来输出Future的序号 var futureIndex = 0 for (stream <- msgStreams.get(TOPIC).get) { processSingleStream(futureIndex, stream) futureIndex = futureIndex+1 } // 主线程阻塞30秒 Thread.sleep(30000) /* 注意,这里虽然主线程退出了,但是已经创建的各个Future任务仍在运行(一直在等待接收消息) * 怎样在主线程里结束各个Future任务呢? */ println(" 结束了 ") } /** * 一个Future处理一个stream * TODO: 还需要一个可以控制Future结束的机制 * @param futureIndex * @param stream * @return */ def processSingleStream(futureIndex:Int, stream: KafkaStream[Array[Byte], Array[Byte]]): Future[Unit] = future { val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() while (it.hasNext) { val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next() println("futureNumer->[" + futureIndex + "], key->[" + new String(data.key) + "], message->[" + new String(data.message) + "], partition->[" + data.partition + "], offset->[" + data.offset + "]") } } def createConfig(): ConsumerConfig = { val props = new Properties() props.put("zookeeper.connect", ZK_CONN) props.put("group.id", GROUP_ID) props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") new ConsumerConfig(props) } }
运行结果
运行Producer
运行Consumer
剖析代码
val connector = Consumer.create(createConfig())
创建一个
ConsumerConnector,这是Consumer的最主要的interface,通过它可以与Kafka cluster进行交互。
val topicCountMap = new HashMap[String, Int]() topicCountMap.put(TOPIC, 2)
topicCountMap告诉Kafka我们在Consumer中将用多少个线程来消费该topic。
topicCountMap的key是topic name,value针对该topic是线程的数量。
val msgStreams: Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] = connector.createMessageStreams(topicCountMap)
createMessageStreams根据提供的topicCountMap来创建对应的
KafkaSteam,每一个
KafkaStream代表了来自若干个partition(可位于一个或者多个server上)的消息流,每一个
KafkaStream对应着Consumer中的一个线程。一个stream中的消息可以来自于多个partitions,但是一个partition中的消息只能前往一个stream。
println("# of streams is " + msgStreams.get(TOPIC).get.size)
打印出对应该topic的
KafkaStream的数量,这里是2。
注意,由于
msgStreams.get(TOPIC)返回的结果类型是
Option[List[KafkaStream[Array[Byte], Array[Byte]]]],因此需要调用
get来获得其中真正的内容,否则
msgStreams.get(TOPIC).size的结果永远是1。
var futureIndex = 0 for (stream <- msgStreams.get(TOPIC).get) { processSingleStream(futureIndex, stream) futureIndex = futureIndex+1 } Thread.sleep(30000)
遍历
msgStreams.get(TOPIC).get返回的
List[KafkaStream[Array[Byte], Array[Byte]]],让每一个
KafkaStream由一个
Future任务去处理(等待消息的到来、获取消息、处理消息、打印出有关的信息)。这里的futureIndex是为了记录哪个
Future处理了哪个
KafkaStream
最后,主线程等待30秒后退出(
Future与主线程异步执行)。
def processSingleStream(futureIndex:Int, stream: KafkaStream[Array[Byte], Array[Byte]]) = future { val it: ConsumerIterator[Array[Byte], Array[Byte]] = stream.iterator() while (it.hasNext) { val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next() println("futureNumer->[" + futureIndex + "], key->[" + new String(data.key) + "], message->[" + new String(data.message) + "], partition->[" + data.partition + "], offset->[" + data.offset + "]") } }
KafkaStream实际上是一个遍历
MessageAndMetadata对象的迭代器,从
MessageAndMetadata可以获得message内容以及相关的元数据,例如,
key是该消息的key,
message是该消息的message内容,
partition是该消息在topic的哪一个partition,
offset则是该消息在对应partition中的offset。
当没有消息的时候,
it.hasNext会阻塞,当有一条消息来的时候,
it.hasNext会返回true,然后对该消息进行处理。
def createConfig(): ConsumerConfig = { val props = new Properties() props.put("zookeeper.connect", ZK_CONN) props.put("group.id", GROUP_ID) props.put("zookeeper.session.timeout.ms", "400") props.put("zookeeper.sync.time.ms", "200") props.put("auto.commit.interval.ms", "1000") new ConsumerConfig(props) }
Consumer所需的配置参数,具体含义如下:
1.
zookeeper.connect: Kafka uses ZK to store offset of messages consumed for specific topic and partition by this Consumer Group
2.
group.id: this string defines the Consumer Group this process is consuming on behalf of
3.
zookeeper.session.timeout.ms: how many milliseconds Kafka will wait for ZK to respond to a read or write request before giving up and continuing to consume messages
4.
zookeeper.sync.time.ms: the number of milliseconds a ZK follower can be behind the master before an error occurs
5.
auto.commit.interval.ms: how often updates to the consumed offsets are written to ZK. Since the commit frequency is time based instead of #messages consumed, if an error occurs between updates to ZK on restart you will get replayed messages
解读运行结果
再回过头来看看Consumer的运行结果看看
futureNumber和
partition的关系:对于被消费的一个message而言,
futureNumber实际上是Consumer线程的编号(共有2个线程),
partition是该message所在partition的编号。从图中可以看出,它们是一一对应的。当然,在顺序上也可能是交叉的。这也验证了 一个stream中的消息可以来自于多个partitions,但是一个partition中的消息只能前往一个stream。
最后,看黄色部分。虽然主线程退出了,但是两个
Future依然在继续等待新的消息到来,并没有随着主线程一起退出。所以最后不得不用
Ctrl + Z来结束这个进程。所以,在实际的应用中,还需要考虑在必要时怎样停止Consumer。
相关文章推荐
- zoj 3261 Connections in Galaxy War(并查集+离线逆向操作)
- sharp 画三条边
- Jq提示小插件Poshy Tip
- Asp.Net定时发送邮件方法(使用线程的方法)
- label 调整label高度
- div中的背景图片不显示
- Objective-C语法学习 第三天
- Material Design效果实现
- 基于Tomcat7、Java、WebSocket的服务器推送聊天室项目
- iOS7修改NavigationItem上的标题名字大小颜色
- 数据结构问题汇总
- ftp server来源分析20140602
- 数据结构--双链表的创建和操作
- oracle11g备份导入oracle10g
- linux下mysql主从配置
- 程序猿们,快用Emoji表情写代码吧
- Android解决屏幕适配问题
- 黑马程序员——Xcode常用快捷键
- WPF的Binding学习笔记(三)
- 安卓按钮单击事件