您的位置:首页 > 其它

kafka 源码分析 3 : Producer

2018-08-13 14:37 459 查看
Producer

Producer是生产者的接口定义

常用的方法有

public Future<RecordMetadata> send(ProducerRecord<K, V> record);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

public void flush();

public void close();

KafkaProducer是异步的,调用send方法后,kafka并没有立即发送给broker,而是先放在buffer缓冲池中就立即返回,后台的IO线程来负责把消息记录转换成请求发送给kafka集群。

buffer大小通过batch.size配置置顶,producer维护每个partition的没有发送记录的buffer。

默认情况下不满的buffer也是可以发送的,可以通过linger.ms来设置等待时间减少请求数量,跟TCP中的Nagle算法是一个道理。

producer的总的buffer大小可以通过buffer.memory控制,如果生产太快来不及发送超过了这个值则会block住,block的最大时间通过max.block.ms,超时后会抛出TimeoutException

key.serialize和value.serializer控制如何把Java对象转换成byte数组传输给kafka集群。

acks控制producer什么时候认为写成功了,数量是需要leader获得的ack的数量。acks=0时producer把消息记录放到socket buffer中就认为成功了;acks=1时,需要leader成功写到本地就返回,但是不需要等待follower的ack。acks=all是,需要所有的in-sync replica都返回ack才认为是发送成功,这样只要有一个in-sync replica存活消息就没有丢。

Partitioner负责决定将哪一个消息写入到哪一个partition, 有一些场景希望特定的key发送到特定的partition时可以指定自己实现的Paritioner。

默认的Partitioner是随机负载均衡的。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

int nextValue = nextValue(topic);

List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

if (!availablePartitions.isEmpty()) {

int part = Utils.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else {

// no partitions are available, give a non-available partition

return Utils.toPositive(nextValue) % numPartitions;

}

} else {

// hash the keyBytes to choose a partition

return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

private int nextValue(String topic) {

AtomicInteger counter = topicCounterMap.get(topic);

if (null == counter) {

counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

if (currentCounter != null) {

counter = currentCounter;

}

}

return counter.getAndIncrement();

}

ProducerRecord

ProducerRecord包含了发送给Broker需要的内容

class ProducerRecord<K, V> {

private final String topic;

private final Integer partition;

private final Headers headers;

private final K key;

private final V value;

private final Long timestamp;

}

KafkaProducer构建过程

// 创建partitioner

this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

// 配置序列化

if (keySerializer == null) {

this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

Serializer.class));

this.keySerializer.configure(config.originals(), true);

} else {

config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

this.keySerializer = ensureExtended(keySerializer);

}

if (valueSerializer == null) {

this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

Serializer.class));

this.valueSerializer.configure(config.originals(), false);

} else {

config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

this.valueSerializer = ensureExtended(valueSerializer);

}

// load interceptors and make sure they get clientId

userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,

ProducerInterceptor.class);

this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),

true, true, clusterResourceListeners);

this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);

this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);

this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);

this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

this.transactionManager = configureTransactionState(config);

int retries = configureRetries(config, transactionManager != null);

int maxInflightRequests = configureInflightRequests(config, transactionManager != null);

short acks = configureAcks(config, transactionManager != null);

this.apiVersions = new ApiVersions();

// RecordAccumulator中实现了累加和等待的逻辑

this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),

this.totalMemorySize,

this.compressionType,

config.getLong(ProducerConfig.LINGER_MS_CONFIG),

retryBackoffMs,

metrics,

time,

apiVersions,

transactionManager);

List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);

Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);

// 高层的网络处理,封装了send、poll等接口

NetworkClient client = new NetworkClient(

new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),

this.metrics, time, "producer", channelBuilder),

this.metadata,

clientId,

maxInflightRequests,

config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),

config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),

config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),

config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),

this.requestTimeoutMs,

time,

true,

apiVersions,

throttleTimeSensor);

// 负责实际发送请求给kafka集群的后台线程

this.sender = new Sender(client,

this.metadata,

this.accumulator,

maxInflightRequests == 1,

config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),

acks,

retries,

this.metrics,

Time.SYSTEM,

this.requestTimeoutMs,

config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),

this.transactionManager,

apiVersions);

String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

this.ioThread.start();

this.errors = this.metrics.sensor("errors");

config.logUnused();

AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

log.debug("Kafka producer started");

KafkaProducer#send

入口在doSend(interceptedRecord, callback);

// 获取cluster信息, 来得到对应topic的cluster节点信息

ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

Cluster cluster = clusterAndWaitTime.cluster;

byte[] serializedKey;

try {

serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

} catch (ClassCastException cce) {

throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +

" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +

" specified in key.serializer");

}

byte[] serializedValue;

try {

serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

} catch (ClassCastException cce) {

throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +

" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +

" specified in value.serializer");

}

// 找到对应的partition

int partition = partition(record, serializedKey, serializedValue, cluster);

tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());

Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),

compressionType, serializedKey, serializedValue, headers);

ensureValidRecordSize(serializedSize);

long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

// producer callback will make sure to call both 'callback' and interceptor callback

Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())

transactionManager.maybeAddPartitionToTransaction(tp);

// 追加到RecordAccumulator中

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,

serializedValue, headers, interceptCallback, remainingWaitMs);

if (result.batchIsFull || result.newBatchCreated) {

log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

this.sender.wakeup();

}

return result.future;

RecordAccumulator

使用双端队列Deque保存ProducerBatch

// We keep track of the number of appending thread to make sure we do not miss batches in

// abortIncompleteBatches().

appendsInProgress.incrementAndGet();

ByteBuffer buffer = null;

if (headers == null) headers = Record.EMPTY_HEADERS;

try {

// check if we have an in-progress batch

// 获取或创建对应TopicPartition的队列

Deque<ProducerBatch> dq = getOrCreateDeque(tp);

synchronized (dq) {

if (closed)

throw new IllegalStateException("Cannot send after the producer is closed.");

// 如果最后一个节点能加入就加入返回

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);

if (appendResult != null)

return appendResult;

}

// 加入不了就要新申请一个

// we don't have an in-progress record batch try to allocate a new batch

byte maxUsableMagic = apiVersions.maxUsableProduceMagic();

int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());

buffer = free.allocate(size, maxTimeToBlock);

synchronized (dq) {

// Need to check if producer is closed again after grabbing the dequeue lock.

if (closed)

throw new IllegalStateException("Cannot send after the producer is closed.");

// 这两个同步块间可能有其他线程已经创建了下一个Batch

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);

if (appendResult != null) {

// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...

return appendResult;

}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

dq.addLast(batch);

incomplete.add(batch);

// Don't deallocate this buffer in the finally block as it's being used in the record batch

buffer = null;

return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);

}

} finally {

if (buffer != null)

free.deallocate(buffer);

appendsInProgress.decrementAndGet();

}

Sender

Sender是一个后台线程, 不考虑事务的话,只分为senProducerDat和poll, poll中等待处理返回结果

void run(long now) {

if (transactionManager != null) {

if (!transactionManager.isTransactional()) {

// this is an idempotent producer, so make sure we have a producer id

maybeWaitForProducerId();

} else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {

// as long as there are outstanding transactional requests, we simply wait for them to return

client.poll(retryBackoffMs, now);

return;

}

// do not continue sending if the transaction manager is in a failed state or if there

// is no producer id (for the idempotent case).

if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {

RuntimeException lastError = transactionManager.lastError();

if (lastError != null)

maybeAbortBatches(lastError);

client.poll(retryBackoffMs, now);

return;

} else if (transactionManager.hasAbortableError()) {

accumulator.abortUndrainedBatches(transactionManager.lastError());

}

}

long pollTimeout = sendProducerData(now);

client.poll(pollTimeout, now);

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Kafka