kafka 0.10.2 消息生产者(producer)
2017-03-22 23:13
274 查看
package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** * 消息生产者 * @author xiaojf 2017/3/22 14:27 */ public class MsgProducer extends Thread { private final KafkaProducer<String, String> producer; private final String topic; private final boolean isAsync; public MsgProducer(String topic, boolean isAsync) { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址 properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式 // properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数 // properties.load("properties配置文件"); this.producer = new KafkaProducer<String, String>(properties); this.topic = topic; this.isAsync = isAsync; } @Override public void run() { int msgNo = 0; while (true) { String msg = "Msg: " + msgNo; String key = msgNo + ""; if (isAsync) {//异步 producer.send(new ProducerRecord<String, String>(this.topic,msg)); // producer.send(new ProducerRecord<String, String>(this.topic, key, msg)); } else {//同步 producer.send(new ProducerRecord<String, String>(this.topic, key, msg), new MsgProducerCallback(System.currentTimeMillis(), key, msg)); } } } /** * 消息发送后的回调函数 */ class MsgProducerCallback implements Callback { private final long startTime; private final String key; private final String msg; public MsgProducerCallback(long startTime, String key, String msg) { this.startTime = startTime; this.key = key; this.msg = msg; } public void onCompletion(RecordMetadata recordMetadata, Exception e) { long elapsedTime = System.currentTimeMillis() - startTime; if (recordMetadata != null) { System.out.println(msg + " be sended to partition no : " + recordMetadata.partition()); } } } public static void main(String args[]) { new MsgProducer("my-replicated-topic",true).start();//开始发送消息 } }
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
相关文章推荐
- kafka 0.8.2 消息生产者 producer
- kafka 0.8.2 消息生产者 KafkaProducer
- 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
- kafka 0.10.2 消息生产者
- kafka 0.10.2 消息消费者
- kafka生产者producer的Java实现。
- Kafka:Producer生产者发送逻辑 - 源码
- storm bolt作为kafka消息队列生产者
- kafka消息服务的producer、broker、consumer的配置
- 实践kafka生产者消息
- Kafka Producer端自定义消息
- kafka-clients 0.10 消息生产者
- kafka单节点搭建(创建topic让producer为consumer提供消息)
- kafka producer 发送消息
- 基于kafka-net实现的可以长链接的消息生产者
- kafka入门1-集群生产消息 报:ERROR Producer connection to localhost:9092 unsuccessful
- kafka_2.11-0.8.2.1生产者producer的Java实现
- Kafka producer无法发送消息解决办法
- (更新)Kafka-可靠的生产者Producer(Java)。
- Kafka Producer机制优化-提高发送消息可靠性