您的位置:首页 > 编程语言

kafka 0.10.1.0 生产者消费者代码

2018-02-07 17:49 489 查看
package cn.bmkp.KafkaDemo;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* kafka工具类
* producer 自动初始化
* consumer 要手动初始化
*
* @author jiangqiang
* @date 2017年8月23日上午11:57:53
*/
public class KafkaUtil {

private static final Logger logger = LoggerFactory.getLogger(KafkaUtil.class);

private static KafkaProducer<String, String> producer;

static {
getProducer();
}

/**
* 获取自动提交偏移量方式的消费者
*
* @param groupId
* @param topic
* @return
* @user jiangqiang
* @date 2017年8月23日上午11:58:17
*/
public static KafkaConsumer<String, String> getConsumer(String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
// props.put("request.timeout.ms", "60000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}

/**
* 获取手动提交偏移量方式的消费者
*
* @param groupId
* @param topic
* @return
* @user jiangqiang
* @date 2017年11月14日上午10:41:37
*/
public static KafkaConsumer<String, String> getManualOffsetConsumer(String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS);
props.put("group.id", groupId);
// 手动提交偏移量
props.put("enable.auto.commit", "false");

// 设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset
// 如果采用latest,消费者只能得道其启动后,生产者生产的消息
props.put("auto.offset.reset", "earliest");
//
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}

/**
* 初始化kafka生产者
*
* @return
* @user jiangqiang
* @date 2017年8月23日上午11:58:39
*/
public static void getProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", KafkaConstants.BOOTSTRAP_SERVERS);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);

}

/**
* 异步发送消息
*
* @param topic
* @param value
* @user jiangqiang
* @date 2017年9月27日下午4:37:23
*/
public static void sendAsync(final String topic, final String value) {
producer.send(new ProducerRecord<String, String>(topic, value), new Callback() {

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (metadata != null) {
logger.info("向kafka写入消息成功 {}", value);
} else {
logger.error("向kafka写入消息失败 {}", value);
logger.error(exception.getMessage());
}
}
});
}

/**
* 同步发送消息
*
* @param topic
* @param value
* @user jiangqiang
* @date 2017年10月30日下午5:21:12
*/
public static void sendSync(final String topic, final String value) {
try {
RecordMetadata recordMetadata = producer.send(new ProducerRecord<String, String>(topic, value)).get();
logger.info("同步发送成功");
logger.info("offset: " + recordMetadata.offset() + " ;partition: " + recordMetadata.partition());
} catch (InterruptedException | ExecutionException e) {
logger.error("同步发送失败");
e.printStackTrace();
}
}

public static void main(String[] args) throws InterruptedException {
//		for (int i = 1; i < 50000; i++) {
//			KafkaUtil.sendAsync("logstash_test", "message_" + i);
//		}

KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer("test_0023", "order_process");

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.value());

}
}

}

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka 生产者 消费者