kafka-clients 0.10 消息消费者
2017-06-01 11:31
281 查看
package tuyou.kafka.consumer; import java.util.Arrays; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; /** * 消息消费者 * * @author:涂有 * @date 2017年6月1日 上午11:27:10 */ public class MsgConsumer { private String group = "MsgConsumer"; private final int timeSection = 3000; private boolean autoCommit; private List<String> topics = Arrays.asList("test"); public MsgConsumer(String topic, String group, boolean autoCommit) { this.topics = Arrays.asList(topic); this.group = group; this.autoCommit = autoCommit; } public void consumer() { Properties properties = new Properties(); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// key反序列化方式 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());// value反系列化方式 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);// 提交方式 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.100.115:2092,192.168.100.115:2093,192.168.100.115:2094");// 指定broker地址,来找到group的coordinator properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);// 指定用户组 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(topics);// 指定topic消费 while (true) { System.out.println("开始poll---------------"); ConsumerRecords<String, String> records = consumer.poll(timeSection);// 拉取一次数据 for (ConsumerRecord<String, String> record : records) { System.out.println("topic: " + record.topic() + " key: " + record.key() + " value: " + record.value() + " partition: " + record.partition()); } if (!autoCommit) { consumer.commitAsync();// 手动commit } System.out.println("完成一次poll---------------"); } } public static void main(String[] args) { new MsgConsumer("test", "MsgConsumer", true).consumer(); } }
相关文章推荐
- kafka-clients 0.10 消息生产者
- 4-kafka0.10 新消费者使用
- kafka消费者客户端启动之后消费不到消息的原因分析
- 消息中间件kafka(0.9以及0.10版本)学习及实践
- java版kafka消费者取不到消息
- java 实现kafka消息生产者和消费者
- kafka中消费者消费消息是阻塞的
- kafka(java客户端)消费者取不到消息,生产者消息也没发送成功
- 2-kafka0.10 消费者详解
- 多线程消息监听容器配置[ 消费者spring-kafka配置文件]
- 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
- 基于Kafka的生产者消费者消息处理本地调试
- 基于Kafka的生产者消费者消息处理本地调试
- kafka 0.10.2 消息消费者
- 关于SpringKafka消费者的几个监听器:[一次处理单条消息和一次处理一批消息]以及[自动提交offset和手动提交offset]
- 分布式消息中间件(三)——Kafka生产者消费者模型
- kafka 0.8.2 消息消费者 consumer
- 消息管道kafka入门了解
- 分布式消息系统Kafka
- kafka消息中间件技术安装教程