kafka生产者与消费者java代码示例
2017-07-12 11:21
579 查看
kafka生产者
package kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class KafkaProducerRebuild extends Thread{
private String topic;
public KafkaProducerRebuild(String topic){
this.topic = topic;
}
public static void main(String[] args) {
// 使用kafka集群中创建好的主题 test 如果kafka集群中没有该主题,则会自动创建
new KafkaProducerRebuild("test").start();
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
i++;
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topic, "key-"+i, "我是生产信息"+i);
producer.send(km);
System.out.println("发送了: " + ++i);
}
}
//创建生产者
private Producer createProducer() {
Properties properties = new Properties();
//声明zookeeper
properties.put("zookeeper.connect", "IP地址:2181,IP地址:2181,IP地址:2181");
properties.put("serializer.class", StringEncoder.class.getName());
// 声明kafka broker
properties.put("metadata.broker.list", "IP地址:9092,IP地址:9092,IP地址:9092");
return new Producer<Integer, String>(new ProducerConfig(properties));
}
}
kafka消费者
package kafkatest;
import java.io.File;
import java.io.FileOutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerRebuild extends Thread {
private String topic;
public KafkaConsumerRebuild(String topic){
this.topic = topic;
}
public static void main(String[] args) {
new KafkaConsumerRebuild("test").start();// 使用kafka集群中创建好的主题 test
}
@Override
public void run() {
System.out.println("--开始消费信息--");
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//设置消费线程数为1,如果是两个线程消费,则设置成2(注意如果是两个线程消费,但此处设置为1,那么只会有一个线程消费)
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topic);
for (KafkaStream<byte[], byte[]> kafkaStream : streams) {
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
//将读取到的数据写入到文件中
writefile(message,"E:\\temp\\");
System.out.println("接收到: " + message);
}
}
}
//创建消费者
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
//声明zookeeper
properties.put("zookeeper.connect", "IP地址:2181,IP地址:2181,IP地址:2181");
//设置超时时间
properties.put("zookeeper.session.timeout.ms", "1000");
//一个topic主题只能被一个组消费一次,要想重复消费,每次可以设置别的组名称,或在zk中移除此group
properties.put("group.id", "ee");
//初始化offset,largest表示从topic的的最新处开始处理,smallest表示从topic的头开始处理
properties.put("auto.offset.reset","smallest");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
//写文件
public static void writefile(String data, String path) {
try {
String format = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
path = path + "kafka.log." + format;
File file = new File(path);
if (!file.exists())
file.createNewFile();
FileOutputStream out = new FileOutputStream(file, true); // 如果追加方式用true
StringBuffer sb = new StringBuffer();
sb.append(data + "\n");
out.write(sb.toString().getBytes("utf-8"));// 注意需要转换对应的字符集
out.close();
} catch (Exception ex) {
a165
System.out.println(ex.getStackTrace());
}
}
}
package kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class KafkaProducerRebuild extends Thread{
private String topic;
public KafkaProducerRebuild(String topic){
this.topic = topic;
}
public static void main(String[] args) {
// 使用kafka集群中创建好的主题 test 如果kafka集群中没有该主题,则会自动创建
new KafkaProducerRebuild("test").start();
}
@Override
public void run() {
Producer producer = createProducer();
int i=0;
while(true){
i++;
KeyedMessage<String, String> km = new KeyedMessage<String, String>(topic, "key-"+i, "我是生产信息"+i);
producer.send(km);
System.out.println("发送了: " + ++i);
}
}
//创建生产者
private Producer createProducer() {
Properties properties = new Properties();
//声明zookeeper
properties.put("zookeeper.connect", "IP地址:2181,IP地址:2181,IP地址:2181");
properties.put("serializer.class", StringEncoder.class.getName());
// 声明kafka broker
properties.put("metadata.broker.list", "IP地址:9092,IP地址:9092,IP地址:9092");
return new Producer<Integer, String>(new ProducerConfig(properties));
}
}
kafka消费者
package kafkatest;
import java.io.File;
import java.io.FileOutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerRebuild extends Thread {
private String topic;
public KafkaConsumerRebuild(String topic){
this.topic = topic;
}
public static void main(String[] args) {
new KafkaConsumerRebuild("test").start();// 使用kafka集群中创建好的主题 test
}
@Override
public void run() {
System.out.println("--开始消费信息--");
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//设置消费线程数为1,如果是两个线程消费,则设置成2(注意如果是两个线程消费,但此处设置为1,那么只会有一个线程消费)
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topic);
for (KafkaStream<byte[], byte[]> kafkaStream : streams) {
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
//将读取到的数据写入到文件中
writefile(message,"E:\\temp\\");
System.out.println("接收到: " + message);
}
}
}
//创建消费者
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
//声明zookeeper
properties.put("zookeeper.connect", "IP地址:2181,IP地址:2181,IP地址:2181");
//设置超时时间
properties.put("zookeeper.session.timeout.ms", "1000");
//一个topic主题只能被一个组消费一次,要想重复消费,每次可以设置别的组名称,或在zk中移除此group
properties.put("group.id", "ee");
//初始化offset,largest表示从topic的的最新处开始处理,smallest表示从topic的头开始处理
properties.put("auto.offset.reset","smallest");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
//写文件
public static void writefile(String data, String path) {
try {
String format = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
path = path + "kafka.log." + format;
File file = new File(path);
if (!file.exists())
file.createNewFile();
FileOutputStream out = new FileOutputStream(file, true); // 如果追加方式用true
StringBuffer sb = new StringBuffer();
sb.append(data + "\n");
out.write(sb.toString().getBytes("utf-8"));// 注意需要转换对应的字符集
out.close();
} catch (Exception ex) {
a165
System.out.println(ex.getStackTrace());
}
}
}
相关文章推荐
- kafka生产者和消费者的javaAPI的示例代码
- kafka本地java示例生产者与消费者,非ZK版
- Java线程同步:生产者-消费者 模型(代码示例)
- Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解
- kafka生产者消费者示例代码
- JAVA生产者消费者(线程同步)代码学习示例
- Java 多线程生产者和消费者代码示例
- kafka生产者、消费者java示例
- Java线程同步:生产者-消费者 模型(代码示例)
- Java线程同步:生产者-消费者 模型(代码示例)
- kafka生产者、消费者代码示例
- java写kafka的生产者与消费者代码
- JAVA,生产者消费者代码示例
- kafka生产者、消费者java示例
- Kafka生产者消费者java示例(包含Avro序列化)
- 【Java并发编程】之十三:生产者—消费者模型(含代码)
- java实现 生产者和消费者问题 多线程同步示例
- Java 生产者 消费者 代码
- java实现Kafka生产者示例
- Java阻塞队列(BlockingQueue)实现 生产者/消费者 示例