您的位置:首页 > 其它

Kafka:Producer生产者发送逻辑 - 源码

2017-02-27 19:50 411 查看

1. 简述

注:kafka版本为0.10.1.0

本文大致梳理Producer的发送过程,若有不对请指出。

我们在使用producer客户端,基本使用方法如下:

KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
producer.send(record, cb);


基本类说明:

KafkaProducer:kafka生产者客户端,用于发送消息

RecordAccumulator:用于缓存要发送的消息,做Batch

NetworkClient:底层通信客户端

Sender:用于发送RecordAccumulator中消息的线程

2. 源码 - KafkaProducer

下面从创建的
KafkaProducer
类开始,其构造函数流程如下:

初始化,读取配置,配置metrics等

创建 RecordAccumulator 缓存器

获取元数据信息 Metadata

创建底层网络客户端 NetworkClient

创建数据发送线程 Sender

完成Producer的启动

具体代码如下:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime();

// ...
// 省略一些基本的初始化工作
// ...

// records 缓存器
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);

// 更新元数据
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

// 创建客户端
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);

// 创建发送线程,daemon形式
// MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1时,即 guaranteeMessageOrder
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

this.errors = this.metrics.sensor("errors");

config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}


3. 源码 - Sender

先从Sender看起,Sender声明了Runnable接口,用于线程,因此,下面是Sender的run方法:

running
作标志,在其为 true 的情况下,执行线程工作;

当被置为 false 时,结束循环,进行线程关闭工作;

若不是强制关闭,则先将缓存器中未发送的消息发出再关闭;

若为强制关闭,则直接放弃发送未发送的数据。

具体代码如下:

public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// 调用关闭线程后,执行关闭操作
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// 如果不是强制关闭,且消息累加器accumulator尚有消息未发送,或者客户端client尚有正在处理(in-flight)的请求
while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
try {
// 调用调用待参数的run()方法继续处理
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// 如果是强制关闭,调用消息累加器accumulator的abortIncompleteBatches(),放弃未处理完的请求
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
this.accumulator.abortIncompleteBatches();
}

// 关闭客户端
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}


细看带参数的run方法:

先从原数据中获取集群信息

获取 其batch处于发送就绪的 节点列表(详见下面
RecordAccumulator
ready
函数)

检查若有topic的leader未知,则强制更新一次
Metadata


遍历第二步获取的节点集,移除连接未就绪的节点(详见下面
RecordAccumulator
ready
函数)

遍历
RecordAccumulator
中已有的Batch。基于第四步筛选后的节点,对每个节点,再遍历该节点下每个作为
leader
topic partition
,取其队列第一个Batch,若符合要求则拿出来。最终遍历结果为,返回每个节点对应的
RecordBatch
集合(详见下面
RecordAccumulator
drain
函数)

若配置中要求保证消息发送顺序(即:MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1),则将上一步中所拿取的所有
topic partition
加入到
RecordAccumulator
muted
集合中,作用为下次调用
RecordAccumulator
drain
函数时,不会从这些
topic partition
拿取Batch,以达到发往该
topic partition
的只有一个request处于未确认状态,即保证到达每个
topic partition
的消息是有序的(详见下面
RecordAccumulator
drain
函数)

遍历已经超过请求时间的Batch

更新metrics

通过第五步中的数据,封装成针对每个节点的
ClientRequest


通过
NetworkClient
发送请求包

具体代码如下:

void run(long now) {
// 获取集群信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 获取发送就绪 batch 的节点列表
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 强制更新 metadata
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}

// 移除未就绪的节点
// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}

// create produce requests
// 创建 producer 请求,遍历就绪节点上的每个partition的第一个batch
// Integer - broker id
// batches - 要发送的batch
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);

// MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1时
// 把要发送的 batches 加入到 RecordAccumulator 的 muted 中
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}

// 遍历已经超过请求时间的batch,并更新metrics
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

sensors.updateProduceRequestMetrics(batches);

// 对每个节点,创建要发送的请求包
List<ClientRequest> requests = createProduceRequests(batches, now);

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);

// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}


4. 源码 - RecordAccumulator

4.1
ready
函数:

函数作用为:返回处于就绪状态的节点列表,以及未知leader的topic集

一个Broker处于
就绪状态 ready
的条件为:

1. 其节点下至少包含一个不处于重发策略(back off)的属于leader的
topic partititon


2. 且 这些
topic partititon
没有被放入muted,即没有正在发送该
topic partititon
中的request,防止乱序

3. 且满足以下条件中的一个即可:

Batch已满

或 Batch已经在缓冲池待了至少
lingerMs
时间

或 缓冲池已满和有将写入的数据被阻塞

或 缓冲池被关闭

具体代码如下:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();

// 缓冲池是否耗尽
boolean exhausted = this.free.queued() > 0;

// 遍历所有的 TopicPartition 队列
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();

// 获取该 TopicPartition 的leader broker
Node leader = cluster.leaderFor(part);
synchronized (deque) {
// 若leader未知,则在结果中返回,后续sender会据此更新 Metadata
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());

// 注意:这里排除了已经放在 muted 中的 TopicPartition, 是为了保证消息顺序(即:MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = 1),也就是若TopicPartition存在正处于发送中的request(未确认),则不属于就绪状态
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {
// 获取第一个Batch,不是移除
RecordBatch batch = deque.peekFirst();
if (batch != null) {
// Batch 是否处于重发策略中,即还在等待下次重发
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;

// 具体上次重发,已经等待的时间
long waitedTimeMs = nowMs - batch.lastAttemptMs;

// 需要等待的时间:
//    若处于重发策略,等于 配置中的 retry.backoff.ms 时间,默认 100ms
//    否则,等于 配置中的 linger.ms 时间,默认 0 ms
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;

// 还需要等待的时间
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
// Batch 是否已满
boolean full = deque.size() > 1 || batch.records.isFull();
// 是否过时,也就是已经等待时间超过了需要等的时间
boolean expired = waitedTimeMs >= timeToWaitMs;

// 所以最后能够发送的条件为:
//        Batch已满
//    或  已经过时
//    或  缓冲池耗尽
//    或  缓冲池被关闭
//    或  正在要求刷新缓冲池
boolean sendable = full || expired || exhausted || closed || flushInProgress();

// 能够发送 且 不处于重发策略 则为就绪节点
if (sendable && !backingOff) {
readyNodes.add(leader);
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}

return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}


4.2
drain
函数:

函数作用:对给定的节点,从缓冲器中抽取节点下作为leader的
topic partition
的Batch集合

遍历给定节点

获取该节点下每个作为leader的
topic partition
(不包括处于muted中的)

获取该缓冲器中
topic partition
的Batch队列

查看第一个Batch,若不为空且不处于重发策略,且不超过request最大大小,则抽出这个Batch准备发送。(特例:若只有一个Batch且超过最大request,也会抽取出去)

具体代码如下:

public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
Set<Node> nodes,
int maxSize,
long now) {
if (nodes.isEmpty())
return Collections.emptyMap();

Map<Integer, List<RecordBatch>> batches = new HashMap<>();

// 遍历每个节点
for (Node node : nodes) {
int size = 0;

// 该节点下所有作为leader的 partitions
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
// 该节点下就绪的 batch
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();

// 遍历所有 partition
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());

// Only proceed if the partition has no in-flight batches.
// 只处理不在发送过程中的 topic partition
if (!muted.contains(tp)) {
// 找到缓冲池中该 topic partition 的 队列
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
// 获取队列第一个 Batch
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
// 注意,如果加上这个Batch的超过了request最大大小限制,但是该节点下还没有就绪数据,还是会将这个Batch发送出去
// 即若有一个batch大于最大大小,就会将这一个Batch发送出去
if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.records.close();
size += batch.records.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: