rocketmq原理:name server ,broker, producer, consumer之间通信
2017-01-08 20:20
911 查看
原文链接:http://www.cnblogs.com/xiaodf/p/5075167.html
参考链接:rocketmq与kafkahttp://blog.csdn.net/chunlongyu/article/category/6638499
在原文的基础上,进行了部分修改,但是感觉博客还是有部分错误,有时间会更新。
简介
官方简介:RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:
[align=left] 能够保证严格的消息顺序[/align]
[align=left] 提供丰富的消息拉取模式[/align]
[align=left] 高效的订阅者水平扩展能力[/align]
[align=left] 实时的消息订阅机制[/align]
[align=left] 亿级消息堆积能力[/align]
二、网络架构
三、特性
1. nameserver
相对来说,nameserver的稳定性非常高。原因有二:1 、nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver,即使全部挂掉,也不影响业务系统使用。无状态
2 、nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。
2. broker
与nameserver关系
[align=left]连接[/align][align=left] 单个broker和所有nameserver保持长连接[/align]
[align=left]心跳[/align]
[align=left] 心跳间隔:每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。[/align]
[align=left] 心跳超时:nameserver每隔10秒钟(此时间无法更改),扫描所有还存活的broker连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则断开连接。[/align]
[align=left] 断开[/align]
[align=left] 时机:broker挂掉;心跳超时导致nameserver主动关闭连接[/align]
[align=left] 动作:一旦连接断开,nameserver会立即感知,更新topc与队列的对应关系,但不会通知生产者和消费者[/align]
负载均衡
[align=left] 一个topic分布在多个broker上,一个broker可以配置多个topic,它们是多对多的关系。 [/align][align=left] 如果某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。[/align]
[align=left] topic消息量都比较均匀的情况下,如果某个broker上的队列越多,则该broker压力越大。[/align]
可用性
由于消息分布在各个broker上,一旦某个broker宕机,则该broker上的消息读写都会受到影响。所以rocketmq提供了master/slave的结构,salve定时从master同步数据,如果master宕机,则slave提供消费服务,但是不能写入消息,此过程对应用透明,由rocketmq内部解决。这里有两个关键点:
[align=left]一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络连接机制,默认情况下,最多需要30秒,但这个时间可由应用设定参数来缩短时间。这个时间段内,发往该broker的消息都是失败的,而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。[/align]
[align=left] 消费者得到master宕机通知后,转向slave消费(重定向,对于2次开发者透明),但是slave不能保证master的消息100%都同步过来了,因此会有少量的消息丢失。但是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。[/align]
可靠性
[align=left] 所有发往broker的消息,有同步刷盘和异步刷盘机制,总的来说,可靠性非常高[/align][align=left] 同步刷盘时,消息写入物理文件才会返回成功,因此非常可靠[/align]
[align=left] 异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电[/align]
消息清理
[align=left]扫描间隔[/align][align=left] 默认10秒,由broker配置参数cleanResourceInterval决定[/align]
[align=left] 空间阈值[/align]
[align=left] 物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%[/align]
[align=left] 清理时机[/align]
[align=left] 默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值[/align]
[align=left] 文件保留时长[/align]
[align=left] 默认72小时,由broker配置参数fileReservedTime决定[/align]
读写性能
[align=left] 文件内存映射方式操作文件,避免read/write系统调用和实时文件读写,性能非常高[/align][align=left]永远一个文件在写,其他文件在读[/align]
[align=left] 顺序写,随机读[/align]
[align=left] 利用linux的sendfile???mmap+write吧机制,将消息内容直接输出到sokect管道,避免系统调用[/align]
系统特性
[align=left] 大内存,内存越大性能越高,否则系统swap会成为性能瓶颈[/align][align=left] IO密集[/align]
[align=left] cpu load高,使用率低,因为cpu占用后,大部分时间在IO WAIT[/align]
[align=left] 磁盘可靠性要求高,为了兼顾安全和性能,采用RAID10阵列[/align]
[align=left] 磁盘读取速度要求快,要求高转速大容量磁盘[/align]
3. 消费者
与nameserver关系
[align=left] 连接[/align][align=left] 单个消费者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。[/align]
[align=left]心跳[/align]
[align=left]与nameserver没有心跳[/align]
[align=left] 轮询时间[/align]
[align=left]默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。该时间由DefaultMQPushConsumer的pollNameServerInteval参数决定,可手动配置。[/align]
与broker关系
[align=left] 连接[/align][align=left]单个消费者和该消费者关联的所有broker保持长连接。[/align]
[align=left]心跳[/align]
[align=left]默认情况下,消费者每隔30秒向所有broker发送心跳,该时间由DefaultMQPushConsumer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费[/align]
[align=left] 断开[/align]
[align=left]时机:消费者挂掉;心跳超时导致broker主动关闭连接[/align]
[align=left]动作:一旦连接断开,broker会立即感知到,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费[/align]
负载均衡
集群消费模式下,一个消费者集群多台机器共同消费一个topic的多个队列,一个队列只会被一个消费者消费。如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。消费机制
[align=left] 本地队列[/align][align=left] 消费者不间断的从broker拉取消息,消息拉取到本地队列,然后本地消费线程消费本地消息队列,只是一个异步过程,拉取线程不会等待本地消费线程,这种模式实时性非常高(本地消息队列达到解耦的效果,响应时间减少)。对消费者对本地队列有一个保护,因此本地消息队列不能无限大,否则可能会占用大量内存,本地队列大小由DefaultMQPushConsumer的pullThresholdForQueue属性控制,默认1000,可手动设置。[/align]
[align=left] 轮询间隔[/align]
[align=left] 消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。[/align]
[align=left]消息消费数量[/align]
[align=left] 监听器每次接受本地队列的消息是多少条?这个参数由DefaultMQPushConsumer的consumeMessageBatchMaxSize属性控制,默认为1,可手动设置。[/align]
消费进度存储
每隔一段时间将各个队列的消费进度存储到对应的broker上,该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置。如果一个topic在某broker上有3个队列,一个消费者消费这3个队列,那么该消费者和这个broker有几个连接?
一个连接,消费单位与队列相关,消费连接只跟broker相关,事实上,消费者将所有队列的消息拉取任务放到本地的队列,挨个拉取,拉取完毕后,又将拉取任务放到队尾,然后执行下一个拉取任务4. 生产者
与nameserver关系
[align=left]连接[/align][align=left] 单个生产者者和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。[/align]
[align=left]轮询时间[/align]
[align=left]默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。该时间由DefaultMQProducer的pollNameServerInteval参数决定,可手动配置。[/align]
[align=left] 心跳[/align]
[align=left]与nameserver没有心跳[/align]
与broker关系
[align=left] 连接[/align][align=left]单个生产者和该生产者关联的所有broker保持长连接。[/align]
[align=left]心跳[/align]
[align=left]默认情况下,生产者每隔30秒向所有broker发送心跳,该时间由DefaultMQProducer的heartbeatBrokerInterval参数决定,可手动配置。broker每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接。[/align]
[align=left]连接断开[/align]
[align=left]移除broker上的生产者信息[/align]
负载均衡
生产者时间没有关系,每个生产者向队列轮流发送消息四、Broker集群配置方式及优缺点
1. 单个 Master
这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用。2. 多 Master 模式
一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由与 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
| nohup sh mqnamesrv & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties & |
3. 多 Master 多 Slave 模式,异步复制
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
| nohup sh mqnamesrv & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties & |
4. 多 Master 多 Slave 模式,同步双写
每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
### 先启动 NameServer,例如机器 IP 为:172.16.8.106:9876
| nohup sh mqnamesrv & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties & |
| nohup sh mqbroker -n 172.16.8.106 : 9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties & |
参数信息,仅供参考
2. 客户端的公共配置类:ClientConfig
参数名 | 默认值 | 说明 |
NamesrvAddr | | NameServer地址列表,多个nameServer地址用分号隔开 |
clientIP | 本机IP | 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定 |
instanceName | DEFAULT | 客户端实例名称,客户端创建的多个Producer,Consumer实际是共用一个内部实例(这个实例包含网络连接,线程资源等) |
clientCallbackExecutorThreads | 4 | 通信层异步回调线程数 |
pollNameServerInteval | 30000 | 轮训Name Server 间隔时间,单位毫秒 |
heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
3. Producer配置
参数名 | 默认值 | 说明 |
producerGroup | DEFAULT_PRODUCER | Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。 |
createTopicKey | TBW102 | 在发送消息时,自动创建服务器不存在的topic,需要指定key |
defaultTopicQueueNums | 4 | 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 |
sendMsgTimeout | 10000 | 发送消息超时时间,单位毫秒 |
compressMsgBodyOverHowmuch | 4096 | 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节 |
retryAnotherBrokerWhenNotStoreOK | FALSE | 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送 |
maxMessageSize | 131072 | 客户端限制的消息大小,超过报错,同时服务端也会限制(默认128K) |
transactionCheckListener | | 事物消息回查监听器,如果发送事务消息,必须设置 |
checkThreadPoolMinSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkThreadPoolMaxSize | 1 | Broker回查Producer事务状态时,线程池大小 |
checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
4. PushConsumer配置
参数名 | 默认值 | 说明 |
consumerGroup | DEFAULT_CONSUMER | Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应将它们归为同一组 |
messageModel | CLUSTERING | 消息模型,支持以下两种1.集群消费2.广播消费 |
consumeFromWhere | CONSUME_FROM_LAST_OFFSET | Consumer启动后,默认从什么位置开始消费 |
allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
Subscription | {} | 订阅关系 |
messageListener | | 消息监听器 |
offsetStore | | 消费进度存储 |
consumeThreadMin | 10 | 消费线程池数量 |
consumeThreadMax | 20 | 消费线程池数量 |
consumeConcurrentlyMaxSpan | 2000 | 单队列并行消费允许的最大跨度 |
pullThresholdForQueue | 1000 | 拉消息本地队列缓存消息最大数 |
Pullinterval | 0 | 拉消息间隔,由于是长轮询,所以为0,但是如果应用了流控,也可以设置大于0的值,单位毫秒 |
consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
5. PullConsumer配置
参数名 | 默认值 | 说明 |
consumerGroup | | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
brokerSuspendMaxTimeMillis | 20000 | 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒 |
consumerPullTimeoutMillis | 10000 | 非长轮询,拉消息超时时间,单位毫秒 |
consumerTimeoutMillisWhenSuspend | 30000 | 长轮询,Consumer拉消息请求咋broker挂起超过指定时间,客户端认为超时,单位毫秒 |
messageModel | BROADCASTING | 消息模型,支持以下两种:1集群消费 2广播模式 |
messageQueueListener | | 监听队列变化 |
offsetStore | | 消费进度存储 |
registerTopics | | 注册的topic集合 |
allocateMessageQueueStrategy | | Rebalance算法实现策略 |
6. Broker配置参数
查看Broker默认配置sh mqbroker -m
参数名 | 默认值 | 说明 |
consumerGroup | | Conusmer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组 |
listenPort | 10911 | Broker对外服务的监听端口 |
namesrvAddr | Null | Name Server地址 |
brokerIP1 | 本机IP | 本机IP地址,默认系统自动识别,但是某些多网卡机器会存在识别错误的情况,这种情况下可以人工配置。 |
brokerName | 本机主机名 | |
brokerClusterName | DefaultCluster | Broker所属哪个集群 |
brokerId | 0 | BrokerId,必须是大等于0的整数,0表示Master,>0表示Slave,一个Master可以挂多个Slave,Master和Slave通过BrokerName来配对 |
storePathCommitLog | $HOME/store/commitlog | commitLog存储路径 |
storePathConsumeQueue | $HOME/store/consumequeue | 消费队列存储路径 |
storePathIndex | $HOME/store/index | 消息索引存储队列 |
deleteWhen | 4 | 删除时间时间点,默认凌晨4点 |
fileReservedTime | 48 | 文件保留时间,默认48小时 |
maxTransferBytesOnMessageInMemory | 262144 | 单次pull消息(内存)传输的最大字节数 |
maxTransferCountOnMessageInMemory | 32 | 单次pull消息(内存)传输的最大条数 |
maxTransferBytesOnMessageInMemory | 65535 | 单次pull消息(磁盘)传输的最大字节数 |
maxTransferCountOnMessageInDisk | 8 | 单次pull消息(磁盘)传输的最大条数 |
messageIndexEnable | TRUE | 是否开启消息索引功能 |
messageIndexSafe | FALSE | 是否提供安全的消息索引机制,索引保证不丢 |
brokerRole | ASYNC_MASTER | Broker的角色 -ASYNC_MASTER异步复制Master -SYNC_MASTER同步双写Master -SLAVE |
flushDiskType | ASYNC_FLUSH | 刷盘方式 -ASYNC_FLUSH异步刷盘 -SYNC_FLUSH同步刷盘 |
cleanFileForciblyEnable | TRUE | 磁盘满,且无过期文件情况下TRUE表示强制删除文件,优先保证服务可用 FALSE标记服务不可用,文件不删除 |
相关文章推荐
- 我的2016书单以及为2017年准备的书单
- redis3.0.7源码阅读(一)源码文件
- jdk1.8更新
- Python 库大全
- Spring对象生命周期控制
- Good Bye 2016D. New Year and Fireworks(dfs)
- jquery 全选、反选、即点即改
- 【CSS基础】css三角提示框
- 再续js打印图形
- 多项式相乘问题(模拟)
- Cooja 中自定义 Java Mote 使用 Collect-View
- QT_校园导航Update
- 【CSS基础】css三角提示框
- 初识 Eclipse
- 写标流程学习总结
- 数据结构------------线性表
- Java多线程——Executors和线程池
- Razor视图引擎和HtmlHelper的使用意义
- struct sock_addr 和struct sock_addr_in的区别与联系
- 机器学习算法比较