kafka 0.10.1.0 生产者消费者代码
2018-02-07 17:49
489 查看
package cn.bmkp.KafkaDemo; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * kafka工具类 * producer 自动初始化 * consumer 要手动初始化 * * @author jiangqiang * @date 2017年8月23日上午11:57:53 */ public class KafkaUtil { private static final Logger logger = LoggerFactory.getLogger(KafkaUtil.class); private static KafkaProducer<String, String> producer; static { getProducer(); } /** * 获取自动提交偏移量方式的消费者 * * @param groupId * @param topic * @return * @user jiangqiang * @date 2017年8月23日上午11:58:17 */ public static KafkaConsumer<String, String> getConsumer(String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); // props.put("request.timeout.ms", "60000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); return consumer; } /** * 获取手动提交偏移量方式的消费者 * * @param groupId * @param topic * @return * @user jiangqiang * @date 2017年11月14日上午10:41:37 */ public static KafkaConsumer<String, String> getManualOffsetConsumer(String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS); props.put("group.id", groupId); // 手动提交偏移量 props.put("enable.auto.commit", "false"); // 设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset // 如果采用latest,消费者只能得道其启动后,生产者生产的消息 props.put("auto.offset.reset", "earliest"); // props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); return consumer; } /** * 初始化kafka生产者 * * @return * @user jiangqiang * @date 2017年8月23日上午11:58:39 */ public static void getProducer() { Properties props = new Properties(); props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<>(props); } /** * 异步发送消息 * * @param topic * @param value * @user jiangqiang * @date 2017年9月27日下午4:37:23 */ public static void sendAsync(final String topic, final String value) { producer.send(new ProducerRecord<String, String>(topic, value), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { logger.info("向kafka写入消息成功 {}", value); } else { logger.error("向kafka写入消息失败 {}", value); logger.error(exception.getMessage()); } } }); } /** * 同步发送消息 * * @param topic * @param value * @user jiangqiang * @date 2017年10月30日下午5:21:12 */ public static void sendSync(final String topic, final String value) { try { RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topic, value)).get(); logger.info("同步发送成功"); logger.info("offset: " + recordMetadata.offset() + " ;partition: " + recordMetadata.partition()); } catch (InterruptedException | ExecutionException e) { logger.error("同步发送失败"); e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { // for (int i = 1; i < 50000; i++) { // KafkaUtil.sendAsync("logstash_test", "message_" + i); // } KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer("test_0023", "order_process"); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println(record.value()); } } }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.1</version> </dependency>
相关文章推荐
- Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解
- kafka生产者与消费者java代码示例
- Kafka的生产者和消费者代码解析
- kafka生产者消费者示例代码
- java写kafka的生产者与消费者代码
- kafka生产者和消费者的javaAPI的示例代码
- kafka生产者、消费者代码示例
- Kafka的生产者和消费者代码解析
- Kafka常用操作命令及生产者与消费者的代码实现
- 用 wait-notify 写一段代码来解决生产者-消费者问题?
- 用多线程实现“生产者-消费者问题”(代码+实验报告)
- 多线程生产者与消费者问题代码模型
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- 线程经典代码二,(同步情况下的生产者/消费者关系)
- Ruby实现生产者和消费者代码分享
- 多线程-生产者消费者问题代码1
- kafka 生产者给消费者发送消息报 class kafka.common.LeaderNotAvailableException
- 创建Kafka0.8.2生产者与消费者
- 多线程_生产者消费者之等待唤醒机制代码分析
- Ruby中用线程实现经典的生产者消费者问题代码实例