Kafka源码分析Producer读取Metadata的数据结构及Metadata两种更新机制介绍
2016-09-29 00:00
387 查看
问题导读:
1.什么是多线程异步发送模型?
2.Metadata的线程安全性如何实现?
3.Metadata的数据结构是什么?
4.producer如何读取Metadata?
5.Sender的如何创建?
6.Senderpoll()如何更新Metadata?
7.Metadata有哪2种更新机制?
8.什么是Metadata失效检测?
9.Metadata有哪些其他的更新策略?
解决方案:
多线程异步发送模型
下图是经过源码分析之后,整理出来的Producer端的架构图:
在上一篇我们讲过,Producer有同步发送和异步发送2种策略。在以前的Kafkaclientapi实现中,同步和异步是分开实现的。而在0.9中,同步发送其实是通过异步发送间接实现,其接口如下:
要实现同步发送,只要在拿到返回的Future对象之后,直接调用get()就可以了。
基本思路
从上图我们可以看出,异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。
要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。
2个数据流
所以在上图中,有2个数据流:
Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata;KafkaProducer先读取Metadata,然后把消息放入队列。
消息流(B1,B2,B3):这个很好理解,不再详述。
本篇着重讲述Metadata流,消息流,将在后续详细讲述。
Metadata的线程安全性
从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的。
Kafka的官方文档上也有说明,KafkaProducer是线程安全的,可以在多线程中调用:
Theproduceristhreadsafeandsharingasingleproducerinstanceacrossthreadswillgenerallybefasterthanhavingmultipleinstances.
从下面代码也可以看出,它的所有public方法都是synchronized:
Metadata的数据结构
下面代码列举了Metadata的主要数据结构:一个Cluster对象+1堆状态变量。前者记录了集群的配置信息,后者用于控制Metadata的更新策略。
producer读取Metadata
下面是send函数的源码,可以看到,在send之前,会先读取metadata。如果metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException
总结:从上面代码可以看出,producerwaitmetadata的时候,有2个条件:
(1)while(metadata.fetch().partitionsForTopic(topic)==null)
(2)while(this.version<=lastVersion)
有wait就会有notify,notify在Sender更新Metadata的时候发出。
Sender的创建
下面是KafkaProducer的构造函数,从代码可以看出,Sender就是KafkaProducer中创建的一个Thread.
Senderpoll()更新Metadata
1.什么是多线程异步发送模型?
2.Metadata的线程安全性如何实现?
3.Metadata的数据结构是什么?
4.producer如何读取Metadata?
5.Sender的如何创建?
6.Senderpoll()如何更新Metadata?
7.Metadata有哪2种更新机制?
8.什么是Metadata失效检测?
9.Metadata有哪些其他的更新策略?
解决方案:
多线程异步发送模型
下图是经过源码分析之后,整理出来的Producer端的架构图:
在上一篇我们讲过,Producer有同步发送和异步发送2种策略。在以前的Kafkaclientapi实现中,同步和异步是分开实现的。而在0.9中,同步发送其实是通过异步发送间接实现,其接口如下:
1 2 3 4 5 6 7 |
基本思路
从上图我们可以看出,异步发送的基本思路就是:send的时候,KafkaProducer把消息放到本地的消息队列RecordAccumulator,然后一个后台线程Sender不断循环,把消息发给Kafka集群。
要实现这个,还得有一个前提条件:就是KafkaProducer/Sender都需要获取集群的配置信息Metadata。所谓Metadata,也就是在上一篇所讲的,Topic/Partion与broker的映射关系:每一个Topic的每一个Partion,得知道其对应的broker列表是什么,其中leader是谁,follower是谁。
2个数据流
所以在上图中,有2个数据流:
Metadata流(A1,A2,A3):Sender从集群获取信息,然后更新Metadata;KafkaProducer先读取Metadata,然后把消息放入队列。
消息流(B1,B2,B3):这个很好理解,不再详述。
本篇着重讲述Metadata流,消息流,将在后续详细讲述。
Metadata的线程安全性
从上图可以看出,Metadata是多个producer线程读,一个sender线程更新,因此它必须是线程安全的。
Kafka的官方文档上也有说明,KafkaProducer是线程安全的,可以在多线程中调用:
Theproduceristhreadsafeandsharingasingleproducerinstanceacrossthreadswillgenerallybefasterthanhavingmultipleinstances.
从下面代码也可以看出,它的所有public方法都是synchronized:
01 02 03 04 05 06 07 08 09 10 11 12 13 |
下面代码列举了Metadata的主要数据结构:一个Cluster对象+1堆状态变量。前者记录了集群的配置信息,后者用于控制Metadata的更新策略。
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
下面是send函数的源码,可以看到,在send之前,会先读取metadata。如果metadata读不到,会一直阻塞在那,直到超时,抛出TimeoutException
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
(1)while(metadata.fetch().partitionsForTopic(topic)==null)
(2)while(this.version<=lastVersion)
有wait就会有notify,notify在Sender更新Metadata的时候发出。
Sender的创建
下面是KafkaProducer的构造函数,从代码可以看出,Sender就是KafkaProducer中创建的一个Thread.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |