您的位置:首页 > 编程语言

漫游Kafka实战篇之客户端编程实例

2016-06-21 08:11 429 查看
原文地址:http://blog.csdn.net/honglei915/article/details/37697655

Kafka视频教程同步首发,欢迎观看!

Kafka Producer APIs

新版的Producer API提供了以下功能:

可以将多个消息缓存到本地队列里,然后异步的批量发送到broker,可以通过参数
producer.type=async做到。
缓存的大小可以通过一些参数指定:
queue.time
batch.size
。一个后台线程((
kafka.producer.async.ProducerSendThread
)从队列中取出数据并让
kafka.producer.EventHandler
将消息发送到broker,也可以通过参数
event.handler定制
handler,在producer端处理数据的不同的阶段注册处理器,比如可以对这一过程进行日志追踪,或进行一些监控。只需实现
kafka.producer.async.CallbackHandler
接口,并在
callback.handler
中配置。
自己编写Encoder来序列化消息,只需实现下面这个接口。默认的Encoder是
kafka.serializer.DefaultEncoder


[java] view
plain copy







interface Encoder<T> {

public Message toMessage(T data);

}

提供了基于Zookeeper的broker自动感知能力,可以通过参数
zk.connect
实现。如果不使用Zookeeper,也可以使用
broker.list
参数指定一个静态的brokers列表,这样消息将被随机的发送到一个broker上,一旦选中的broker失败了,消息发送也就失败了。
通过分区函数
kafka.producer.Partitioner类对消息分区


[java] view
plain copy







interface Partitioner<T> {

int partition(T key, int numPartitions);

}

分区函数有两个参数:key和可用的分区数量,从分区列表中选择一个分区并返回id。默认的分区策略是
hash(key)%numPartitions
.如果key是null,就随机的选择一个。可以通过参数
partitioner.class
定制分区函数。

新的api完整实例如下:

[java] view
plain copy







package com.cuicui.kafkademon;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

/**

* @author <a href="mailto:leicui001@126.com">崔磊</a>

* @date 2015年11月4日 上午11:44:15

*/

public class MyProducer {

public static void main(String[] args) throws InterruptedException {

Properties props = new Properties();

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);

props.put("partitioner.class", "com.cuicui.kafkademon.MyPartitioner");

props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

// 单个发送

for (int i = 0; i <= 1000000; i++) {

KeyedMessage<String, String> message =

new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);

producer.send(message);

Thread.sleep(5000);

}

// 批量发送

List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);

for (int i = 0; i <= 10000; i++) {

KeyedMessage<String, String> message =

new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);

messages.add(message);

if (i % 100 == 0) {

producer.send(messages);

messages.clear();

}

}

producer.send(messages);

}

}

下面这个是用到的分区函数:

[java] view
plain copy







import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class MyPartitioner implements Partitioner {

public MyPartitioner(VerifiableProperties props) {

}

/*

* @see kafka.producer.Partitioner#partition(java.lang.Object, int)

*/

@Override

public int partition(Object key, int partitionCount) {

return Integer.valueOf((String) key) % partitionCount;

}

}

KafKa Consumer APIs

Consumer API有两个级别。低级别的和一个指定的broker保持连接,并在接收完消息后关闭连接,这个级别是无状态的,每次读取消息都带着offset。

高级别的API隐藏了和brokers连接的细节,在不必关心服务端架构的情况下和服务端通信。还可以自己维护消费状态,并可以通过一些条件指定订阅特定的topic,比如白名单黑名单或者正则表达式。

低级别的API

[java] view
plain copy







package com.cuicui.kafkademon;

import java.nio.ByteBuffer;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.Broker;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetRequest;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.TopicMetadataResponse;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.javaapi.message.ByteBufferMessageSet;

import kafka.message.Message;

import kafka.message.MessageAndOffset;

/**

* offset自己维护 目标topic、partition均由自己分配

*

* @author <a href="mailto:leicui001@126.com">崔磊</a>

* @date 2015年11月4日 上午11:44:15

*

*/

public class MySimpleConsumer {

public static void main(String[] args) {

new MySimpleConsumer().consume();

}

/**

* 消费消息

*/

public void consume() {

int partition = 0;

// 找到leader

Broker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);

// 从leader消费

SimpleConsumer simpleConsumer =

new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");

long startOffet = 1;

int fetchSize = 1000;

while (true) {

long offset = startOffet;

// 添加fetch指定目标tipic,分区,起始offset及fetchSize(字节),可以添加多个fetch

FetchRequest req =

new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();

// 拉取消息

FetchResponse fetchResponse = simpleConsumer.fetch(req);

ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);

for (MessageAndOffset messageAndOffset : messageSet) {

Message mess = messageAndOffset.message();

ByteBuffer payload = mess.payload();

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

String msg = new String(bytes);

offset = messageAndOffset.offset();

System.out.println("partition : " + 3 + ", offset : " + offset + " mess : " + msg);

}

// 继续消费下一批

startOffet = offset + 1;

}

}

/**

* 找到制定分区的leader broker

*

* @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”

* @param topic topic

* @param partition 分区

* @return

*/

public Broker findLeader(String brokerHosts, String topic, int partition) {

Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();

System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),

leader.port()));

return leader;

}

/**

* 找到指定分区的元数据

*

* @param brokerHosts broker地址,格式为:“host1:port1,host2:port2,host3:port3”

* @param topic topic

* @param partition 分区

* @return 元数据

*/

private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {

PartitionMetadata returnMetaData = null;

for (String brokerHost : brokerHosts.split(",")) {

SimpleConsumer consumer = null;

String[] splits = brokerHost.split(":");

consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");

List<String> topics = Collections.singletonList(topic);

TopicMetadataRequest request = new TopicMetadataRequest(topics);

TopicMetadataResponse response = consumer.send(request);

List<TopicMetadata> topicMetadatas = response.topicsMetadata();

for (TopicMetadata topicMetadata : topicMetadatas) {

for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {

if (PartitionMetadata.partitionId() == partition) {

returnMetaData = PartitionMetadata;

}

}

}

if (consumer != null)

consumer.close();

}

return returnMetaData;

}

/**

* 根据时间戳找到某个客户端消费的offset

*

* @param consumer SimpleConsumer

* @param topic topic

* @param partition 分区

* @param clientID 客户端的ID

* @param whichTime 时间戳

* @return offset

*/

public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);

Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =

new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));

OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);

OffsetResponse response = consumer.getOffsetsBefore(request);

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

}

低级别的API是高级别API实现的基础,也是为了一些对维持消费状态有特殊需求的场景,比如Hadoop consumer这样的离线consumer。

高级别的API

[java] view
plain copy







package com.cuicui.kafkademon;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

/**

* offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护

*

* @author <a href="mailto:leicui001@126.com">崔磊</a>

* @date 2015年11月4日 上午11:44:15

*/

public class MyHighLevelConsumer {

/**

* 该consumer所属的组ID

*/

private String groupid;

/**

* 该consumer的ID

*/

private String consumerid;

/**

* 每个topic开几个线程?

*/

private int threadPerTopic;

public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {

super();

this.groupid = groupid;

this.consumerid = consumerid;

this.threadPerTopic = threadPerTopic;

}

public void consume() {

Properties props = new Properties();

props.put("group.id", groupid);

props.put("consumer.id", consumerid);

props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);

props.put("zookeeper.session.timeout.ms", "60000");

props.put("zookeeper.sync.time.ms", "2000");

// props.put("auto.commit.interval.ms", "1000");

ConsumerConfig config = new ConsumerConfig(props);

ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

// 设置每个topic开几个线程

topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);

// 获取stream

Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);

// 为每个stream启动一个线程消费消息

for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {

new MyStreamThread(stream).start();

}

}

/**

* 每个consumer的内部线程

*

* @author cuilei05

*

*/

private class MyStreamThread extends Thread {

private KafkaStream<byte[], byte[]> stream;

public MyStreamThread(KafkaStream<byte[], byte[]> stream) {

super();

this.stream = stream;

}

@Override

public void run() {

ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

// 逐条处理消息

while (streamIterator.hasNext()) {

MessageAndMetadata<byte[], byte[]> message = streamIterator.next();

String topic = message.topic();

int partition = message.partition();

long offset = message.offset();

String key = new String(message.key());

String msg = new String(message.message());

// 在这里处理消息,这里仅简单的输出

// 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理

System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()

+ ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "

+ key + " , mess : " + msg);

}

}

}

public static void main(String[] args) {

String groupid = "myconsumergroup";

MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1", 3);

MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2", 3);

consumer1.consume();

consumer2.consume();

}

}

这个API围绕着由KafkaStream实现的迭代器展开,每个流代表一系列从一个或多个分区多和broker上汇聚来的消息,每个流由一个线程处理,所以客户端可以在创建的时候通过参数指定想要几个流。一个流是多个分区多个broker的合并,但是每个分区的消息只会流向一个流。

每调用一次createMessageStreams都会将consumer注册到topic上,这样consumer和brokers之间的负载均衡就会进行调整。API鼓励每次调用创建更多的topic流以减少这种调整。createMessageStreamsByFilter方法注册监听可以感知新的符合filter的tipic。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: