您的位置:首页 > 其它

kafka 0.10.2 消息生产者(producer)

2017-03-22 23:13 274 查看
package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 消息生产者
* @author xiaojf 2017/3/22 14:27
*/
public class MsgProducer extends Thread {

private final KafkaProducer<String, String> producer;
private final String topic;
private final boolean isAsync;

public MsgProducer(String topic, boolean isAsync) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式
//        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数

//        properties.load("properties配置文件");

this.producer = new KafkaProducer<String, String>(properties);
this.topic = topic;
this.isAsync = isAsync;
}

@Override
public void run() {
int msgNo = 0;

while (true) {
String msg = "Msg: " + msgNo;
String key = msgNo + "";
if (isAsync) {//异步
producer.send(new ProducerRecord<String, String>(this.topic,msg));
//                producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
} else {//同步
producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
new MsgProducerCallback(System.currentTimeMillis(), key, msg));
}
}
}

/**
* 消息发送后的回调函数
*/
class MsgProducerCallback implements Callback {

private final long startTime;
private final String key;
private final String msg;

public MsgProducerCallback(long startTime, String key, String msg) {
this.startTime = startTime;
this.key = key;
this.msg = msg;
}

public void onCompletion(RecordMetadata recordMetadata, Exception e) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (recordMetadata != null) {
System.out.println(msg + " be sended to partition no : " + recordMetadata.partition());
}
}
}

public static void main(String args[]) {
new MsgProducer("my-replicated-topic",true).start();//开始发送消息
}
}


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: