1.1 介绍






因此,在high  level上来看,producer通过网络向kafka集群发送消息,同时,kafka集群可以向consumers提供这些消息,如图所示:


clients和servers之间的通信是通过简单的、高性能的、语言无关的TCP 协议完成的;官方不仅提供了Java




上图中可以看出,每个partition中的消息序列都是有序的,一旦写入就不可更改,只能在partition末尾不停追加。同一partition中的不同消息都会分配一个连续的数字进行标识,这个数字被称为offset,代表了消息的 写入顺序。



事实上,每个消费者所需要保存的元数据只有一个,即”offset“,即主要用来记录日志中当前consume的位置。offset是由consumer维护,而不是kafka集群维护: 通常情况下,offset会随着consumer阅读消息而线性的递增,好似offset只能被动跟随consumer阅读变化,但实际上,offset完全是由consumer控制的,consumer可以从任何它喜欢的位置consum消息。例如,consumer可以将offset重新设置为先前的值并重新consum数据。

这些特征共同说明: Kafka consumers耗费资源比较少----在不影响集群和其他consumers的情况下,consumers可以方便的反复读取。例如,可以使用kafka提供的命令行工具持续读取任何topic的最新消息,而不会影响到其他任何consumers 的消费行为。

日志划分为不同的partitions有以下目的: 第一,多个partition的存储能力远超过单个服务器,但是每一个partition的存储能力就是所在服务器的存储能力,即同一个topic的同一个partition的数据只能在同一台server上存储,也就是说同一个topic下的同一个partition的数据不能同时存放于两台server上,但是同一个topic可以包含很多partitions,理论上你可以通过增加server的数目来增加partitions的数目。第二,partitions机制提高了并行处理能力,而且提高的可不是一点半点(既可以有多个consumers对不同的partition进行consume,也可以有不同的consumers对同一个partition进行consume,因为offset是由consumer控制的)。





每个partitions拥有一台称为“leader”的服务器,同时拥有0个或者多个称为“followers”的服务器;“leader”处理所有针对此partition的读写请求,而“follower”只是对“leader"进行备份。如果”leader“失效,则”followers"当中的某一个会自动成为新“leader”。 每个服务器上都存储多个partitions,它既是这当中部分partitions的“leader”,同时又是剩余partitions的“follower”,这样可以很好均衡集群负载。




传统消息系统采用两种模式: queuing   和    publish-subscribe。队列模式中,众多consumers可能从同一个server读取消息,而每条消息被某个consumer所消耗。在发布-订阅模式下,每条消息都可以被所有consumers消耗。Kafka提供一种单独的consumer抽象,此抽象具有以上两种模式的特征----consumer





kafka  consumer group 模式:


Consumers使用consumer  group名字标识它们自己,每个topic的每条消息都会发送到所有consumer groups,但是只能发送到每个consumer groups的某个consumer实例。这些consumer实例既可以分布在不同的进程中,也可以分布在不同的机器上。








传统队列模式在server上按顺序存储消息,如果多个consumers从队列中consum,则server按照消息存储顺序发出消息。 然而,即使server按照顺序发出消息,由于消息是异步发向consumers,则这些消息可能并不是按照存储顺序到达consumers的(例如,server上消息的存放顺序是M1,M2,M3,M4,consumers是C1,C2,C3,C4,异步发送为:M1发向C2,M2发向C4,M3发向C1,M4发向C3,则消息到达consumer的时间次序可能是,M4,M2,M1,M3,这样的结果就是consumer处理消息的顺序与存储顺序不一致,就打乱了原始消息次序)。这就意味着:消息顺序在并行consumption中丢失了。消息传递系统的工作通常围绕这个原则:consumer唯一,即一个队列只允许有一个消费者,但这也就意味着失去了并行处理机制。

Kafka这方面就做的很好。 kafka提供并行处理机制--即partition---在topics内部,Kafka既可以提供消息顺序保证,也可以通过consumer进程池提供负载均衡。上述两条是通过将将opics名下的partitions分配给consumer组内不同的consumers来实现的,这样每个partition可以都是由consumer组里某一个consumer消费。通过以上分配可以保证某个consumer会成为某个partition的确定consumer,这样一来,consumer也就会按照数据存储顺序消费。多个partitions的存在可以实现众多consumer实例之间的负载均衡。需要注意的是,同一个consumer组里的consumer实例不能多于partitions。




-发往特定topic partition的消息将会按照发送顺序进行追加。例如,如果M1和M2都是由相同的producer发送,而M1发送顺序较早,则M1在partition中offset要小于M2,就是说M1在消息日志中出现较早。


-若是topic 的参数--replication-factor 为N,即设置备份server为N,则即使N-1个备份server都失败了,容错性可以保证没有丢失任何提交的日志消息。


1.2  Use  Cases

此处有一些Apache  Kafa广泛使用的例子描述。如果需要这些方面的概述,请看  this
blog  post。


Kafka 可以替代相对传统的消息代理(broker)。使用消息代理(brokers) 的原因有很多:解耦数据产生和数据消费模块,以及缓存未处理的消息等。相比大多数消息系统,kafka优势更加明显:更加出色的吞吐量、内置的partitioning机制、备份机制、以及容错机制,这都使得kafka成为大规模消息处理应用的比较好的解决方案。




Website  Activity  Tracking(网站活动跟踪)






 Log  Aggregation(日志聚合)


 Stream  Processing(流式处理)

Kafka推出了一款称为Kafka Streams的流式处理库,优点是轻量级同时性能很好,它可以完成上面所描述的多级处理。除了Kafka
streams之外,还有一些开源流式处理工具可以选用,包括Apache  StormSamza

Event  Sourcing(事件收集)

Event  sourcing应用设计模式是:状态改变记录在一系列以时间为顺序的日志中。Kafka对大规模数据存储的支持使它成为有效的后台处理方式,用于处理事件收集。

Commit  Log(提交日志)

compaction)有助于实现这种应用;在这种应用中,Kafka类似于Apache  BookKeeper 项目。

1.3   Quick  Start



Step  1: Download  the  code

下载 发布版本,然后解压缩:

 > tar -xzf kafka_2.11-

 > cd kafka_2.11-    

 Step 2: Start  the server

Kafka使用 ZooKeeper,因此你需要首先启动ZooKeeper server。你可以方便的使用脚本来启动。

 > bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)



 现在启动Kafka 服务器:


> bin/kafka-server-start.sh config/server.properties

 [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

 [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)


Step 3: 创建 topic


> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

现在可以看一下topic, 使用命令行:

> bin/kafka-topics.sh --list --zookeeper localhost:2181




 Kafka拥有命令行客户端, 既可以从文件获得输入,也可以从标准输入获得,然后把这些输入作为消息发送给Kafka集群。 默认的设置是,每一行作为一个单独的消息发送出去。

运行Producer, 然后向控制端输入一些消息并发送到服务器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

Step 5: 启动一个Consumer

kafka 也拥有一个命令行的consumer,这就可以将消息输出到标准输出。

 > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

如果你在不同的终端运行上述命令,你可以在producer 终端输入消息,然后在consumer终端会看到这些消息。


Step 6:设置多broker的集群

目前为止,我们上述的测试都是在单broker环境下测试的,但是这还不够有意思。对Kafka来说,单节点broker是含有一个节点的集群,因此,除非你启动更多broker实例,不然没有任何事情发生变化。 但是, 就是为了感受这些变化,我们将我们的集群扩展为3节点(不用担心机器不够,这3个节点都是部署在同一个机器上)。

 首先,我们需要为每个broker创建配置文件:实现方式就是拷贝几份  config/server.properties,然后根据具体需求进行配置:

> cp config/server.properties config/server-1.properties 

> cp config/server.properties config/server-2.properties










broker.id 属性是唯一的,是集群中每个节点的永久性名字。 我们需要更改端口以及日志目录,只是因为我们在同一台机器上运行所有节点, 我们希望尽量避免所有brokers都注册相同的端口或者覆盖各自的数据。

 我们前面步骤已使ZooKeeper 以及单节点已经启动,因此我们当前就只需要启动两个新节点:

> bin/kafka-server-start.sh config/server-1.properties &


> bin/kafka-server-start.sh config/server-2.properties &


现在创建一个新topic,并且设置 replication factore为3


> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 好了,现在我们已有集群了,但是我们怎么知道是哪个brokers在运行。 运行“describe  topics"命令,来看一下:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

PartitionCount:1    ReplicationFactor:3    Configs:
      Topic: my-replicated-topic    Partition: 0    Leader: 1   Replicas:
1,2,0    Isr: 1,2,0


 “leader”  是负责针对给定partition读写操作的节点。每个节点都将成为partitions随机选择的leader。

 “replicas” 是给定partition备份节点的列表,无论这些备份节点是否leader,或者无论他们是否还活跃。

 “isr”   是同步的备份列表。这是备份列表的子集,即当前还活跃并且可以联系被leader连接到的备份节点。

注意: 我的例子中  1是topic中唯一一个partition的leader。



> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1          ReplicationFactor:1   Configs:
         Topic: test   Partition: 0   Leader: 0 
         Replicas: 0          Isr: 0


毫无疑问,最初的topic没有副本备份,并且处于server 0上,就是当我们创建这个server时,我们的集群中仅有的server。


> bin/kafka-console-producer.sh  --broker-list  localhost:9092   --topic  my-replicated-topic 


my test message 1

my test message 2



> bin/kafka-console-consumer.sh   --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic


my test message 1

my test message 2


 现在可以测试kafka 的容错性能,broker 1一直充当leader,因此我们可以杀掉他:

  > ps | grep server-1.properties

 7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...

> kill -9 7564

leadership 转向两个被领导者之一,同时node 1不再同步。



> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic


Topic: my-replicated-topic
Partition: 0Leader: 2Replicas: 1,2,0Isr: 2,0


> bin/kafka-console-consumer.sh  --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic


my test message 1

my test message 2


Step 7:使用Kafka Connect导入/导出数据

刚开始使用kafka时,使用终端来写入数据或者将数据写回终端是很方便的测试方式,但是在以后的使用中,你可能向从其他来源将数据导入kafka或者将数据从kafka导出到其他系统。对于很多系统来说,不必开发相关客户端继承代码,就可以使用Kafka Connect工具导入或者导出数据。Kafka Connect是Kafka内置工具,专门用来导入或者导出数据,而且这款工具是可扩展的,他使用connectors在客户端逻辑上实现了与外部系统的互联。在QuickStart中,可以看到Kafka Connect是如何使用简单的connectors,就可以从文件导入数据到kafka
topic以及将Kafka topic中的数据导出到一个文件。首先,创建一些包含信息流的文件用来测试:

> echo -e "foo\nbar" > test.txt

下面,我们将启动两个以单例模式运行的connectors,这就意味着它们运行在一个单独的、本地的、专门的进程中。我们提供三个配置文件作为输入参数。第一个是Kafka Connect进程的配置,主要是通用配置,例如连接的Kafka brokers以及数据序列化格式。剩余的配置文件每一个都用来创建的connector。这些配置文件包括一个独一无二的connector名字,实例化的connector类,以及connector所要求的其他配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties










> cat test.sink.txt

注意,数据存放在Kafka topic



> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning


> echo "Another line" >> test.txt


Step 8:Use Kafka Streams to process data

Kafka Streams是Kafka中用于客户端的库,主要用于获取实时流处理以及分析Kafka brokers中存储的数据。这个例子将会展示如何使用这个库来运行一个流式处理应用。这里有一个WordCountDemo的主要代码(转换成Java8 lambda表达式更易读):

KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))

// Count the occurrences of each word (record key) and store the results into a table named "Counts".

这个例子实现了WordCount算法,这个算法计算了输入文本中的词频。然而,这个例子并不像你以前看到的那些例子一样,都是计算固定大小的数据,这个WordCount demo应用稍微有点不同,它是基于不会终止的数据流计算的。和计算固定数据的模型比较形似的是,它也会不停的更新词频计算结果。然而,由于它是基于永不停止的数据流,所以会周期性的输出当前的计算结果,他会不停的处理更多的数据,因为它也不知道数据流什么时候终止。

现在可以将输入数据导入Kafka topic,这些数据会由Kafka Streams应用处理:

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt


> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-file-input

> cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input

下面,可以运行WordCount demo应用来处理输入数据:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo


现在可以检查一下WordCount demo应用的输出:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer


all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1




1.4   Ecosystem

除主要的分布式功能之外,Kafka 可以和很多工具集成在一起。                 


 ecosystem page列出很多可以集成的工具,包含流式处理系统,Hadoop 集成、检测以及部署工具。

1.5  Upgrading  From Previous  Versions



Upgrading from 0.8.x or 0.9.x to has potential
breaking changes (please review before upgrading) and possible performance
impact following the upgrade. By following the recommended rolling upgrade
plan below, you guarantee no downtime and no performance impact during and following the upgrade. 
Note: Because new protocols are introduced, it is important to upgrade your Kafka
clusters before upgrading your clients.

Notes to clients with version Due to a bug introduced in, clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not work with 0.10.0.x brokers. Therefore,
clients should be upgraded to before brokers are upgraded to 0.10.0.x. This step is not necessary for 0.8.X or clients.

For a rolling upgrade:
Update server.properties file on all brokers and add the following properties:
inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2 or
log.message.format.version=CURRENT_KAFKA_VERSION (See potential
performance impact following the upgrade for the details on what this configuration does.)

Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to NOTE: You shouldn't touch log.message.format.version yet - this parameter
should only change once all consumers have been upgraded to
Restart the brokers one by one for the new protocol version to take effect.
Once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.0 on each broker and restart them one by one.

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

Potential performance
impact following upgrade to

The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. The on disk message format can be configured through log.message.format.version in the server.properties file. The default on-disk message format
is 0.10.0. If a consumer client is on a version before, it only understands message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer
on an older version. However, the broker can't use zero-copy transfer in this case. Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all
clients to bring performance back to normal. To avoid such message conversion before consumers are upgraded to, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to This way, the broker can still use zero-copy
transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. The conversion is supported to ensure
compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore it is critical to avoid the message conversion as much as possible
when brokers have been upgraded but the majority of clients have not.

For clients that are upgraded to, there is no performance impact.

Note: By setting the message format version, one certifies that all existing messages are on or below that message format version. Otherwise consumers before might break. In particular, after the message format is set to 0.10.0, one
should not change it back to an earlier format as it may break consumers on versions before

Note: Due to the additional timestamp introduced in each message, producers sending small messages may see a message throughput degradation because of the increased overhead. Likewise, replication now transmits an additional 8 bytes per message.
If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards and see failures and performance issues due to the overload.
Note: If
you have enabled compression on producers, you may notice reduced producer throughput and/or lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0 brokers avoid recompressing the messages, which in general reduces the
latency and improves the throughput. In certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition,
the producer buffer used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka release.

Potential breaking changes in

Starting from Kafka, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0.
Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages.
ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0
FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0
MessageFormatter interface was changed from 
writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def writeTo(consumerRecord:
ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)

MessageReader interface was changed from 
readMessage(): KeyedMessage[Array[Byte], Array[Byte]]
def readMessage(): ProducerRecord[Array[Byte],

MessageFormatter's package was changed from 

MessageReader's package was changed from 

MirrorMakerMessageHandler no longer exposes the 
MessageAndMetadata[Array[Byte], Array[Byte]])
 method as it was never called.
The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0.
The new consumer has standardized its APIs to accept 
the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library.
LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1). To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later.
Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later should use interoperable LZ4f framing. A list of interoperable
LZ4 libraries is available at http://www.lz4.org/
Notable changes in

Starting from Kafka, a new client library named Kafka Streams is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and
upward versioned brokers due to message format changes mentioned above. For more information please read this
The default value of the configuration parameter 
now 64K for the new consumer.
The new consumer now exposes the configuration parameter 
restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.
The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible.
The new consumer API has been marked stable.

Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to has potential
breaking changes (please review before upgrading) and an inter-broker protocol
change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should
be upgraded first as well.

For a rolling upgrade:
Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to
Restart the brokers one by one for the new protocol version to take effect

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

Potential breaking changes in

Java 1.6 is no longer supported.
Scala 2.9 is no longer supported.
Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker
configuration property accordingly.
Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.
Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching
messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.
Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently
complain and quit (and stop compacting all compacted topics).
MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance
per source cluster, each with its own consumer configuration.
Tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. All included scripts will still function as usual, only custom code directly importing these classes will
be affected.
The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh.
The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure.
The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual
The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the new producer instead of the old producer be default, and users have to specify 'old-producer' to use the old producer.
By default all command line tools will print all logging messages to stderr instead of stdout.

Notable changes in

The new broker id generation feature can be disabled by setting broker.id.generation.enable to false.
Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process
via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default.

Deprecations in

Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality.
The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.
The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed
to use the new class).
The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException
but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured

Upgrading from 0.8.1 to 0.8.2

0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a
time by simply bringing it down, updating the code, and restarting it.

Upgrading from 0.8.0 to 0.8.1

0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time
by simply bringing it down, updating the code, and restarting it.

Upgrading from 0.7

Release 0.7 is incompatible with newer releases. Major changes were made to the
API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a special
tool for migration. This migration can be done without downtime.
