RocketMQ原理解析-consumer 5.push消费-顺序消费消息
2017-07-13 15:42
861 查看
顺序消费服务ConsumeMessageConcurrentlyService构建的时候
构建一个线程池来接收消费请求ConsumeRequest
构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务
周期性锁队列lockMQPeriodically
获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>>
遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。 在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合, 根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false
通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。
ConsumeRequest任务的run方法
判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息
每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费,
判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息
计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死
获取客户端的消费批次个数,默认一批次为一条
从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费
调回调接口消费消息,返回状态对象ConsumeOrderlyStatus
根据消费状态,处理结果
1) 非事物方式,自动提交
消息消息状态为success:调用processQueue.commit方法
获取msgTreeMapTemp的最后一个key,表示提交的 offset
清空msgTreeMapTemp的消息,已经成功消费
2) 事物提交,由用户来控制提交回滚(精卫专用)
更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去
构建一个线程池来接收消费请求ConsumeRequest
构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务
周期性锁队列lockMQPeriodically
获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>>
遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。 在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合, 根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false
通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService,
ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。
ConsumeRequest任务的run方法
判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息
每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费,
判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息
计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死
获取客户端的消费批次个数,默认一批次为一条
从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费
调回调接口消费消息,返回状态对象ConsumeOrderlyStatus
根据消费状态,处理结果
1) 非事物方式,自动提交
消息消息状态为success:调用processQueue.commit方法
获取msgTreeMapTemp的最后一个key,表示提交的 offset
清空msgTreeMapTemp的消息,已经成功消费
2) 事物提交,由用户来控制提交回滚(精卫专用)
更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去
相关文章推荐
- RocketMQ原理解析-consumer 5.push消费-顺序消费消息
- RocketMQ原理解析-consumer 5.push消费-顺序消费消息
- RocketMQ原理解析-consumer 4.长轮询push消息—并发消费消息
- RocketMQ原理解析-consumer 4.长轮询push消息—并发消费消息
- RocketMQ原理解析-producer 3.如何发送顺序消息
- 消息中间件 RocketMQ源码解析:Message顺序发送与消费
- RocketMQ原理解析-producer 3.如何发送顺序消息
- RocketMQ原理解析-producer 6.消息在broker落地之事物消息
- RocketMQ原理解析-broker 2.消息存储
- 消息顺序和消息事务 - RocketMQ及分布式消息系统的原理以及重要问题解读
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- 分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
- RocketMQ原理解析-producer 5.消息在broker落地之普通消息
- RocketMQ原理解析-Consumer
- RocketMQ原理解析-consumer 7.shutdown
- RocketMQ原理解析-broker 2.消息存储
- 消息中间件 RocketMQ源码解析:Message拉取&消费(下)
- 分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”-- 一个看似简单的复杂问题
- RocketMQ原理解析-consumer 7.shutdown
- RocketMQ原理解析-consumer 1.启动