您的位置:首页 > 其它

RabbitMQ消息可靠送达

2017-07-11 11:24 309 查看
目前项目中采用RabbitMQ,在运行的过程当中,发现有一部分消息存在丢失的情况,结合自己对RabbitMQ的理解,一般分为两种情况,一种是客户端丢失,即消息没有成功送达到RabbitMQ Server,一种是消费端并有成功消费消息,针对这两种情况,我们分别做了相应的方案。

一、消息没有送达到RabbitMQ Server,导致丢失

由于看不了RabbitMQ Server的消息日志,我们自己做了一个消息日志表,操作步骤如下:

1、在业务操作成功之后,把消息持久化到DB中,这两步操作放在同一个事务当中,要么都成功,要么都失败。

2、事务提交成功之后,单独启动一个线程进行消息的发送

3、启用RabbitMQ的消息确认(message acknowledgement)机制,我们使用的是Spring,首先就要修改Spring-rabbitmq.xml中的配置。配置如下:

<rabbit:connection-factory  id="rabbitmqConnectionFactory"  virtual-host="${rabbitmq.vhost}" channel-cache-size="25"  host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"  password="${rabbitmq.password}" publisher-confirms="true"/>
<rabbit:admin connection-factory="rabbitmqConnectionFactory"/>
<rabbit:template id="amqpTemplate"  exchange="${exchangeName}" connection-factory="rabbitmqConnectionFactory" confirm-callback="confirmCallBackListener" return-callback="returnCallBackListener" mandatory="true"/>

关键参数已用红色字体标识。注意:mandatory必须设置为true,否则回调不会生效。

confirmCallBackListener类的代码如下:

@Service("confirmCallBackListener")
public class ConfirmCallBackListener implements ConfirmCallback{
private static Logger logger = LoggerFactory.getLogger(ConfirmCallBackListener.class);
@Autowired
private IMessageLogService messageLogService;

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
try {
if (!ack) {
logger.info("send message failed: " + cause + correlationData.toString());
} else {//发送成功,更新消息日志表数据
logger.info("send message successful !");
MessageLog vo =  new MessageLog();
vo.setToken(correlationData.getId());
vo.setSendStatus(ConstantUtils.SENT);
messageLogService.saveOrUpdateMessageLog(vo);
logger.info("update message send status ==="+ JSON.toJSONString(vo));
}

}catch (Exception e){
logger.error("message confirm exception :"+e);
}
}
}
returnCallBackListener类的代码如下:

@Service("returnCallBackListener")
public class ReturnCallBackListener implements ReturnCallback{
private static Logger logger = LoggerFactory.getLogger(ReturnCallBackListener.class);

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
try {
System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
//做相应的业务操作
}catch (Exception e){
logger.error("message return callback exception:"+e);

}
}
}
4、如果出现RabbitMQ Server连接不上,我们做了一个定时任务,扫描message_log表,如果存在消息没有确认,则会向RabbitMQ Server一直重发,直到发送成功。

二、消费端消费消息失败或有其它异常

1、消费者在消费消息成功时给到生产者回执,这时发送端收到消费者回执之后,更改message-log表的发送状态。则整个过程结束。

2、采用RabbitMQ本身的机制进行消息回执。配置如下:

<rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="manual" >
<rabbit:listener queues="your_queue_name" ref="receiveConfirmListener" />
</rabbit:listener-container>


receiveCallBackListener类的代码如下:

@Service("receiveConfirmListener")
public class ReceiveConfirmListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
System.out.println("consumer--:" + message.getMessageProperties() + ":" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
//TODO 业务处理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
}
这样基本就保证了消息的可靠送达

如果有别的同学有更好的方案,欢迎吐槽。

注:MessageLog对象是一张消息日志表,表结构大致如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: