kafka消费API理解
2016-05-12 17:29
344 查看
主要消费部分API:
这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。
每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的topic。
代码举例:
上面的例子中,申请了6个线程来处理kafka stream中的消息, 因为假设了过滤器中的两个topic,每个topic3个分区。
注:每个kafka stream汇集了多个分区的消息,但每个分区的消息只能汇入一个stream,即上面的消费线程是对应stream的,而不是对应分区。
ConsumerConnector connector = Consumer.create(consumerConfig); interface ConsumerConnector { public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap); public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams); public commitOffsets(); public shutdown(); }
这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。
每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的topic。
代码举例:
int NUMBER_OF_STREAMS = 6; Properties consumerConfig = new Properties(); consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181" ); consumerConfig.put("backoff.increment.ms", "100"); consumerConfig.put("autooffset.reset", "largest"); consumerConfig.put("groupid", "java-consumer-example"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig)); TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic"); List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_STREAMS); ExecutorService executor = Executors.newFixedThreadPool(streams.size()); for(final KafkaStream<Message> stream: streams){ executor.submit(new Runnable() { public void run() { for (MessageAndMetadata<Message> msgAndMetadata: stream) { ByteBuffer buffer = msgAndMetadata.message().payload(); byte [] bytes = new byte[buffer.remaining()]; buffer.get(bytes); //Do something with the bytes you just got off Kafka. } } }); }
上面的例子中,申请了6个线程来处理kafka stream中的消息, 因为假设了过滤器中的两个topic,每个topic3个分区。
注:每个kafka stream汇集了多个分区的消息,但每个分区的消息只能汇入一个stream,即上面的消费线程是对应stream的,而不是对应分区。
相关文章推荐
- Kafka 之 中级
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- Logstash 与Elasticsearch整合使用示例
- 大数据实验室(大数据基础培训)——Kafka的安装、配置及基础使用
- 大数据实验室(大数据基础培训)——概要
- kafka-manager 的编译和使用(附安装包)
- Kafka+Log4j实现日志集中管理
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- kafka集群的安装
- kafka 一些基本知识
- Kafka入门经典教程
- Kafka初步学习总结
- 自研轻量级分布式实时计算框架light_drtc
- note of kafka learning (first part, before replication)