您的位置:首页 > 编程语言

kafka 官方示例代码--消费者

2016-05-12 10:44 706 查看
kafka 0.9.0添加了一套新的Java 消费者API,用以替换之前的high-level API (基于ZK) 和low-level API。新的Java消费者API目前为测试版。另外kafka 0.9暂时还支持0.8的Client。

1、High Level Consumer(0.8)

public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;

public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(10000);
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}

// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}

Then in a separate thread, the consumer can be shutdown by setting the closed flag and waking up the consumer.
closed.set(true);
consumer.wakeup();


View Code
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: