RabbitMQ消息可靠送达
2017-07-11 11:24
309 查看
目前项目中采用RabbitMQ,在运行的过程当中,发现有一部分消息存在丢失的情况,结合自己对RabbitMQ的理解,一般分为两种情况,一种是客户端丢失,即消息没有成功送达到RabbitMQ Server,一种是消费端并有成功消费消息,针对这两种情况,我们分别做了相应的方案。
一、消息没有送达到RabbitMQ Server,导致丢失
由于看不了RabbitMQ Server的消息日志,我们自己做了一个消息日志表,操作步骤如下:
1、在业务操作成功之后,把消息持久化到DB中,这两步操作放在同一个事务当中,要么都成功,要么都失败。
2、事务提交成功之后,单独启动一个线程进行消息的发送
3、启用RabbitMQ的消息确认(message acknowledgement)机制,我们使用的是Spring,首先就要修改Spring-rabbitmq.xml中的配置。配置如下:
<rabbit:connection-factory id="rabbitmqConnectionFactory" virtual-host="${rabbitmq.vhost}" channel-cache-size="25" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" publisher-confirms="true"/> <rabbit:admin connection-factory="rabbitmqConnectionFactory"/> <rabbit:template id="amqpTemplate" exchange="${exchangeName}" connection-factory="rabbitmqConnectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/>
关键参数已用红色字体标识。注意:mandatory必须设置为true,否则回调不会生效。
confirmCallBackListener类的代码如下:
@Service("confirmCallBackListener") public class ConfirmCallBackListener implements ConfirmCallback{ private static Logger logger = LoggerFactory.getLogger(ConfirmCallBackListener.class); @Autowired private IMessageLogService messageLogService; @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); try { if (!ack) { logger.info("send message failed: " + cause + correlationData.toString()); } else {//发送成功,更新消息日志表数据 logger.info("send message successful !"); MessageLog vo = new MessageLog(); vo.setToken(correlationData.getId()); vo.setSendStatus(ConstantUtils.SENT); messageLogService.saveOrUpdateMessageLog(vo); logger.info("update message send status ==="+ JSON.toJSONString(vo)); } }catch (Exception e){ logger.error("message confirm exception :"+e); } } }returnCallBackListener类的代码如下:
@Service("returnCallBackListener") public class ReturnCallBackListener implements ReturnCallback{ private static Logger logger = LoggerFactory.getLogger(ReturnCallBackListener.class); @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { try { System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey); //做相应的业务操作 }catch (Exception e){ logger.error("message return callback exception:"+e); } } }4、如果出现RabbitMQ Server连接不上,我们做了一个定时任务,扫描message_log表,如果存在消息没有确认,则会向RabbitMQ Server一直重发,直到发送成功。
二、消费端消费消息失败或有其它异常
1、消费者在消费消息成功时给到生产者回执,这时发送端收到消费者回执之后,更改message-log表的发送状态。则整个过程结束。
2、采用RabbitMQ本身的机制进行消息回执。配置如下:
<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" > <rabbit:listener queues="your_queue_name" ref="receiveConfirmListener" /> </rabbit:listener-container>
receiveCallBackListener类的代码如下:
@Service("receiveConfirmListener") public class ReceiveConfirmListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { try { System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace();
//TODO 业务处理 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } }这样基本就保证了消息的可靠送达
如果有别的同学有更好的方案,欢迎吐槽。
注:MessageLog对象是一张消息日志表,表结构大致如下:
相关文章推荐
- RabbitMQ—如何实现高可靠消息消费逻辑
- rabbitmq队列的exclusive,durability,auto-delete属性以及消息可靠传输设计
- RabbitMQ如何保证发送端消息的可靠投递-发生镜像队列发生故障转移时
- IM消息送达保证机制实现(二):保证离线消息的可靠投递
- RabbitMQ如何保证发送端消息的可靠投递
- IM消息送达保证机制实现(二):保证离线消息的可靠投递
- IM消息送达保证机制实现(二):保证离线消息的可靠投递
- 柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
- spring整合消息队列rabbitmq
- 消息中间件ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ、Kafka如何选型?
- python 监控rabbitmq 消息队列消息数量
- rabbitmq plugins rabbitmq_delayed_message_exchange消息队列延迟消息插件
- RabbitMQ消息的传输控制
- java 操作 RabbitMQ 发送、接受消息
- RabbitMQ消息队列(二):"Hello, World"[转]
- (八)RabbitMQ消息队列-通过Topic主题模式分发消息
- RabbitMQ消息队列(二):”Hello, World“
- WebAPi的可视化输出模式(RabbitMQ、消息补偿相关)所有webapi似乎都缺失的一个功能
- (十一)RabbitMQ消息队列-如何实现高可用
- java-rabbitmq-实例pull模式拉取消息