rabbitmq (三) 发布/订阅
2017-12-21 11:43
274 查看
rabbitmq的目的并不是让生产者把消息直接发到队列里面去,
这样不能实现解耦的目的,也不利于程序的扩展.
所以就有交换机(exchanges)的概念.
交换机有几种类型:direct, topic, headers 和fanout,
可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.
感觉交换机主要控制消息的投递方式.
临时队列:
可以通过创建队列的方式对消息的存储等方式进行管理.
绑定:
最后通过绑定的方式把交换机和队列进行关联.
publish:
subscribe:
这样不能实现解耦的目的,也不利于程序的扩展.
所以就有交换机(exchanges)的概念.
交换机有几种类型:direct, topic, headers 和fanout,
可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.
channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
感觉交换机主要控制消息的投递方式.
临时队列:
可以通过创建队列的方式对消息的存储等方式进行管理.
var queueName = channel.QueueDeclare().QueueName;
绑定:
最后通过绑定的方式把交换机和队列进行关联.
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
publish:
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace RMQ_Publish { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); int i = 100; while (true) { var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString()); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); if (i-- == 0) i = 100; } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } } }
subscribe:
var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
Thread.Sleep(100);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
相关文章推荐
- RabbitMQ下的生产消费者模式与订阅发布模式
- rabbitmq 发布/订阅
- RabbitMQ入门-发布订阅模式
- RabbitMQ 基础教程 Publish/Subscribe(发布/订阅)
- RabbitMQ指南(7)-发布/订阅消息
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ之发布订阅
- [置顶] RabbitMQ服务之发布/订阅篇
- RabbitMQ (三)消息的发布于订阅
- RabbitMQ 教程3 发布与订阅
- RabbitMQ (三) 发布/订阅
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- 3、RabbitMQ之消息发布订阅与信息持久化技术
- RabbitMQ (三) 发布/订阅
- RabbitMQ (三) 发布/订阅
- RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)(转载)
- RabbitMQ之消息发布订阅与信息持久化技术
- RabbitMQ-三、Java使用--2、发布/订阅
- 轻松搞定RabbitMQ(四)——发布/订阅