您的位置:首页 > 其它

kafka消费API理解

2016-05-12 17:29 344 查看
主要消费部分API:

ConsumerConnector connector = Consumer.create(consumerConfig);

interface ConsumerConnector {

public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);

public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);

public commitOffsets();

public shutdown();
}


这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的topic。

代码举例:

int NUMBER_OF_STREAMS = 6;
Properties consumerConfig = new Properties();
consumerConfig.put("zk.connect", "zookeeper.mydomain.com:2181" );
consumerConfig.put("backoff.increment.ms", "100");
consumerConfig.put("autooffset.reset", "largest");
consumerConfig.put("groupid", "java-consumer-example");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));

TopicFilter sourceTopicFilter = new Whitelist("mytopic|myothertopic");
List<KafkaStream<Message>> streams = consumer.createMessageStreamsByFilter(sourceTopicFilter, NUMBER_OF_STREAMS);
ExecutorService executor = Executors.newFixedThreadPool(streams.size());
for(final KafkaStream<Message> stream: streams){
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata<Message> msgAndMetadata: stream) {
ByteBuffer buffer = msgAndMetadata.message().payload();
byte [] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
//Do something with the bytes you just got off Kafka.
}
}
});
}


上面的例子中,申请了6个线程来处理kafka stream中的消息, 因为假设了过滤器中的两个topic,每个topic3个分区。

注:每个kafka stream汇集了多个分区的消息,但每个分区的消息只能汇入一个stream,即上面的消费线程是对应stream的,而不是对应分区。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka