RabbitMQ 发布订阅持久化及持久化方式
RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑。RabbitMQ厂家也深知开发者的声音,稳定、可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化、Queue持久化及Message的持久化等。以保证RabbitMQ在重启或Crash等异常情况下,消息不会丢失。RabbitMQ提供了简单的参数配置来实现持久化操作。
简单说明一下各种持久化方式:(描述代码采用的是RabbitMQ.Client SDK, C#代码)
Queue持久化:队列是我们使用RabbitMQ进行数据传输的最多使用的方式,是进行点对点消息传递使用最多的方式。队列的持久化是通过durable=true 来实现。
var connFactory = new ConnectionFactory(); Conn = connFactory.CreateConnection(); Model = Conn.CreateModel(); Model.QueueDeclare(q, false, false, false, null);
其中,QueueDeclare的定义:
/// <summary>(Spec method) Declare a queue.</summary> [AmqpMethodDoNotImplement(null)] QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
参数说明:queue:队列名称。durable:设置是否执行持久化。如果设置为true,即durable=true,持久化实现的重要参数。
exclusive:指示队列是否是排他性。如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。需要注意:1. 排他队列是基于连接可见的,同一连接的不同信道Channel是可以同时访问同一连接创建的排他队列;2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
autoDelete:是否自动删除。如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于发布订阅方式创建的临时队列。
消息的持久化:如果将一个队列设置为持久化,那么会创建一个持久化的队列,但并不意味着队列中的消息也会持久化存储。因此如果要保证消息在RabbitMQ出现异常时不会丢失,需要设定消息的持久化。
简要说明一下消息持久化和队列持久化的联系:
队列设置为持久化,那么在RabbitMQ重启之后,持久化的队列也会存在,并会保持和重启前一致的队列参数。
消息设置为持久化,在RabbitMQ重启之后,持久化的消息也会存在。
那么就会出现一些矛盾的地方:
1、因为消息必须依附于队列存在才有意义,那么如果队列设置为非持久化,而消息设置为持久化。在RabbitMQ重启之后,持久化的消息是否还存在呢?因为非持久化的队列可能并不存在。
2、如果设置消息持久化为true,但队列设置成排他性队列,那么在RabbitMQ重启之后,消息是否仍然存在。请自行查找分析,下次分析该问题。
var sf = new ConnectionFactory(); using (IConnection conn = cf.CreateConnection()) { IModel ch = conn.CreateModel();
Model = Conn.CreateModel(); Model.QueueDeclare(queueName, true, false, false, null);
string message = "Hello C# SSL Client World"; byte[] msgBytes = System.Text.Encoding.UTF8.GetBytes(message);
//发送消息 ch.BasicPublish("", queueName, null, msgBytes); bool noAck = false; BasicGetResult result = ch.BasicGet(qName, noAck); byte[] body = result.Body; string resultMessage = System.Text.Encoding.UTF8.GetString(body); Assert.AreEqual(message, resultMessage); }
通过RabbitMQ SDK发送消息至MQ非常简单,通过BasicPublish即可。
BasicPublish 的定义:
/// <summary> /// (Spec method) Convenience overload of BasicPublish. /// </summary> /// <remarks> /// The publication occurs with mandatory=false /// </remarks> [AmqpMethodDoNotImplement(null)] void BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, byte[] body);
设置消息持久化,需要设置basicProperties的DeliveryMode=2 (Non-persistent (1) or persistent (2)).
设置了队列和消息持久化后,当服务重启之后,消息仍然存在。只设置队列持久化,不设置消息持久化,重启之后消息会丢失;只设置消息持久化,不设置队列持久化,在服务重启后,队列会消失,从而依附于队列的消息也会丢失。只设置消息持久化而不设置队列的持久化,毫无意义。
Exchange持久化:
为了实现一对多的消息发送,我们一般会采用发布订阅模式,通过一个发送端、多个订阅端来实现消息的分发。
MQ SDK新增接口: IMQSession新增方法: /// <summary> /// 创建消息消费者 /// </summary> /// <param name="topicName">主题名称</param> /// <param name="customTopicQueueName">自定义Topic关联队列名称</param> /// <param name="isPersistence">是否持久化</param> /// <returns>消息消费者</returns> IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false); 调用方式:消费端需要明确指定需要消费的发布订阅关联队列。例如配置中心热部署,每个配置中心实例都需要指定唯一的关联队列名。 这样就可以和正常的MAC队列消费一样,消费指定队列消息。 实现方式,四个步骤: 1.创建持久化Topic(即持久化Exchange): var service = MQServiceProvider.GetDefaultMQService(); var messageText = "abc"; ///创建Topic using (var connection = service.CreateConnection()) { var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge); var messageCreator = service.GetMessageCreator(); var message = messageCreator.CreateMessage(messageText); message.IsPersistent = true; var producer = session.CreateProducer(); var topic = session.DeclareTopic(topicName, true); } 2.定义消费者Consumer: List<string> queueList = new List<string>() { "guozhiqi1", "guozhiqi2", "guozhiqi3", "guozhiqi4", "guozhiqi5", "guozhiqi6", "guozhiqi7", "guozhiqi8", "guozhiqi9", }; //var service = MQServiceProvider.GetDefaultMQService(); //var messageText = "abc" + DateTime.Now.ToShortTimeString(); //定义消费者 using (var connection1 = service.CreateConnection()) { var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge); foreach (var item in queueList) { session1.DeclareQueue(item, true); var consumer = session1.CreateTopicConsumer(topicName, item, true); } } 3.发送消息到Topic //发送消息 for (int i = 0; i <= 100; i++) { using (var connection = service.CreateConnection()) { var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge); var messageCreator = service.GetMessageCreator(); var message = messageCreator.CreateMessage(messageText); message.IsPersistent = true;//设置持久化 message.TimeToLive = TimeSpan.FromSeconds(30);//设置过期时间 var producer = session.CreateProducer(); var topic = session.DeclareTopic(topicName, true); producer.Send(message, topic); } } 4.从队列接收消息 Parallel.ForEach(queueList, (item) => { while (true) { //接收消息 using (var connection1 = service.CreateConnection()) { var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge); session1.DeclareQueue(item, true); var consumer = session1.CreateTopicConsumer(topicName, item, true); var topic = session1.DeclareTopic(topicName, true); var receivedmessage = consumer.Receive(topic); var textMessage = receivedmessage as ITextMessage; Assert.AreEqual(messageText, textMessage.Body); consumer.Acknowledge(receivedmessage); } } });View Code
- RabbitMQ之消息发布订阅与信息持久化技术
- 3、RabbitMQ之消息发布订阅与信息持久化技术
- RabbitMQ之消息发布订阅与信息持久化技术
- RabbitMQ学习之三: 发布/订阅(广播方式fanout)
- RabbitMQ学习之四:发布/订阅(direct方式)
- 05-rabbitmq-发布订阅
- RabbitMQ学习总结 第四篇:发布/订阅 Publish/Subscribe
- SqlServer2008 数据库同步的两种方式 (发布、订阅)
- (转)SqlServer 数据库同步的两种方式 (发布、订阅),主从数据库之间的同步
- RabbitMQ (三) 发布/订阅
- SQL SERVER 2008 利用发布订阅方式实现数据库同步
- 采用SQLServer 发布,订阅方式实现数据库同步遇到问题总结
- RabbitMQ (三) 发布/订阅
- redis 消息的发布与订阅 持久化
- RabbitMQ-三、Java使用--2、发布/订阅
- SQL Server 2008 数据库同步的两种方式 (发布、订阅)
- Rabbit mq订阅方式获取消息并可设置持久化
- activemq的三种基本通信方式总结 点对点 发布订阅 请求应答
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- .Net下RabbitMQ的使用(4) -- 订阅和发布