您的位置:首页 > 其它

Kafka总结系列(四)

2016-06-26 15:46 447 查看

Replication

        kafka中一个topic有若干分区,以分区为单位进行备份。创建topic时可以指定该topic的分区个数以及副本数m,一个partition有一个leader broker以及(m-1)个follower broker。follower像普通的consumer那样从leader同步消息,producer和consumer只和leader进行通信交互。所以leader均匀地散布在所有服务器中,以均衡负载。

        如何判断一个server是否“alive”?

        1、与zookeeper保持会话,通过zookeeper的心跳机制;

        2、作为一个follower需要和leader保持消息同步,不落后太多(可以配置)

        同时满足这两个条件的follower被保存在一个“in-sync-replication”ISR列表中,当leader crash时,便从中选择一个作为新的leader,而不是采用zookeeper的多数投票机制。

        kafka中默认,一条消息只有被该partition对应的leader以及所有follower追加至其log之后才是committed,才可以被消费者消费。所以只要有一个broker is alive,那么该消息就不会丢失,consumer也不用担心消费到的消息由于leader crash而不能重新消费。producer可以通过配置在延迟和高可靠性之间进行tradeoff。

Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

        kafka中,partition本质是一个replicated log,一个消息追加在末尾的日志文件。消息有id,即offset,递增。

        Quorums:可以实现为多数投票算法,zookeeper采用的,2n+1台机器,可以允许n台crash。

        ISR:维护一个列表f,列表中的任一个均可以成为新的leader,可以允许f-1台crash。该列表的信息保存在zookeeper中;如果一个broker由于消息落后太多导致out of the isr-list,当他从leader处消费到足够多的消息后,还可以重回ISR。

Unclean leader election: What if they all die?

        如果所有的broker都crash了,怎么办?

等待ISR列表中的某一台机器come back to life,选择他作为leader;
选择第一个苏醒来的broker为leader,他不一定在ISR列表中;
        选择哪一种就需要在可用性和一致性之间进行tradeoff。0.8.2版本中使用第二种方案,

Availability and Durability Guarantees

        即使producer生产消息时设置:request.required.acks=-1,要求ISR列表中所有broker接收到消息时才确认为发送成功,消息仍可能丢失。设想:如果ISR列表中的所有服务器均crash,这就导致durability不好。提供了两种设置,自己选择prefer durability or availability。

Disable unclean leader election:如果ISR中所有的broker均宕机了,那么该partition变为unavailable,直到有一个新的leader;

Specify a minimum ISR size:定义最小的ISR size,只有当alive broker的个数超过该值时,partition才可用,可以写消息。该值越大, 意味着稳定性越高,但是可用性降低。

Replica Management

        kafka集群中存在多个topic,多个partition,要尽可能地让leader broker均匀地散布在brokers中。所以从ISR列表中选择新的leader时,要考虑他们已经承载的partition leader角色的个数。如果一个broker crash,需要为其承担的所有partition一一选择新的leader。我们选择一个broker作为“controller”,他监控其他所有broker的运行状况,发现failure,并为殃及的所有partition重新选leader。这样就可以保证election
cheap and fast,当controller fail,还得从alive broker中选一个作为新的controller。

Log Compaction

kafka broker的配置文件中可以设置日志的清理策略:

log.cleanup.policy:日志的清理策略:delete或者compact。delete:当日志文件超过指定大小或者时间限制,就会被删除;compact:对日志进行压缩,启动cleaner线程;

log.retention.{ms,minutes,hours}:log segment日志文件保存的最长时间;

log.retention.bytes:每个partition对应的log segment可以达到的最大字节数,topic的总大小=分区个数*该值,-1表示无大小限制;如果同时设置了时间和bytes,则只要有一个满足条件,就会执行清理策略,压缩或者直接删除;
log.cleaner.enable:是否启动日志压缩功能,若log.cleanup.policy设置为compact,则该项必须为true;

        日志压缩应用的场景:日志消息之间有关联关系,不希望丢掉所有的旧数据,一旦发生故障,可以从最初的状态开始进行恢复。由于在项目中并没有应用日志压缩,所以对其使用场景理解不够深刻。

Log Compaction Basics

        压缩是在后台由cleaner thread进行的。为了不影响producer和consumer的正常工作,可以在配置文件中指定压缩线程占用的IO等指标。



        日志压缩的方式:疑惑:图中的key是什么?(用户自定义分区策略时可以将消息中的某一个字段指定为key,依据他进行hash,确保同一个key对应的消息在一个分区)压缩方法:将同一个key对应的所有消息压缩为一条,压缩后消息的offset是原所有消息中最大的offset。

What guarantees does log compaction provide?

最新的消息不会被压缩,其offset依然是顺序递增;
压缩并不会对消息进行重排序,只是把若干条压缩为一条,所以相对顺序不变;
压缩后消息的offset值也不会改变,他是一条消息永久的唯一标识;
Any read progressing from offset 0 will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the reader reaches the head of the log in a time
period less than the topic's delete.retention.ms setting (the default is 24 hours). This is important as delete marker removal happens concurrently with read (and thus it is important that we not remove any delete marker prior to the reader seeing it).这个不太理解。

Any consumer progressing from the start of the log, will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the consumer reaches the head
of the log in a time period less than the topic's delete.retention.ms setting (the default is 24 hours). This is important as delete marker removal happens concurrently with read, and thus it is important that we do not remove any delete marker prior to the
4000
consumer seeing it.

Log Compaction Details

日志压缩由一些后台线程来完成,他们删除一部分消息, 重新拷贝日志文件;

broker配置文件中可以设置压缩ratio:log.cleaner.min.cleanable.ratio,选择highest ratio进行压缩;
统计每个key对应所有消息的最大offset值;
遍历日志文件,进行拷贝。如果该key在后面还有出现,则舍弃本条消息,即比较当前消息的offset和他的key对应的max-offset,小于则舍弃,等于则copy。也就是将原log segment进行有选择地拷贝,形成一份新的file,原来的直接删除即可。
The summary of the log head is essentially just a space-compact hash table. It uses exactly 24 bytes per entry. As a result with 8GB of cleaner buffer one cleaner iteration can clean around 366GB of log head (assuming 1k messages)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: