您的位置:首页 > 其它

使用原生方法从kafka消费消息

2016-02-12 09:24 288 查看
kafka最早是linkedin开发的一套高性能类队列结构,具有发布—订阅功能。现在是apache的项目之一。支持很多种客户端从其中进行consume,网上也有许多第三方的客户端(注1),但下面我们只使用其自己的包中的方法,来进行consume。我们的这个例子是从一个servlet中调用kafka的Consumer相关类,来读取远端kafka中的message。

代码如下:

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String topic = "test";
List<Message> list = new ArrayList<Message>();
StringBuilder builder = new StringBuilder();
KafkaHttpConsumer consumer = new KafkaHttpConsumer();
list = consumer.consume(topic);
builder.append("[");
for(int i=0; i<list.size(); i++){
builder.append(list.get(i).message);
builder.append(",");
}
builder.deleteCharAt(builder.length()-1);
builder.append("]");
response.getWriter().append(builder.toString());
}



import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import com.fasterxml.jackson.annotation.JsonInclude;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class KafkaHttpConsumer {

public List<Message> consume(String topic) {
Properties prop = new Properties();
try {
prop.load(this.getClass().getResourceAsStream("/kafka-http.properties"));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
ConsumerConfig config = new ConsumerConfig(prop);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);

Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts);
KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);

List<Message> messages = new ArrayList<>();
try {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream)
messages.add(new Message(messageAndMetadata));
} catch (ConsumerTimeoutException ignore) {
} finally {
connector.commitOffsets();
connector.shutdown();
}
return messages;
}

/* for test */
public static void main(String[] args) {
Properties prop = new Properties();
try {
prop.load(KafkaHttpConsumer.class.getResourceAsStream("/kafka-http.properties"));
Iterator<Object> ite = prop.keySet().iterator();
while(ite.hasNext()){
String key = (String)ite.next();
System.out.println("value:" + prop.getProperty(key));
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

public static class Message {
public String topic;

@JsonInclude(JsonInclude.Include.NON_NULL)
public String key;
public String message;

public int partition;
public long offset;

public Message(MessageAndMetadata<byte[], byte[]> message) {
this.topic = message.topic();

this.key = message.key() != null ? new String(message.key(), Charset.forName("utf-8")) : null;
this.message = new String(message.message(), Charset.forName("utf-8"));

this.partition = message.partition();
this.offset = message.offset();
}
}
}


kafka-http.properties

#for read from kafka
zookeeper.connect=192.20.34.144:2181
group.id=group
auto.offset.reset=smallest
consumer.timeout.ms=500


注1:https://cwiki.apache.org/confluence/display/KAFKA/Clients
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: