您的位置:首页 > 其它

rabbitmq (三) 发布/订阅

2017-12-21 11:43 274 查看
rabbitmq的目的并不是让生产者把消息直接发到队列里面去,

这样不能实现解耦的目的,也不利于程序的扩展.

所以就有交换机(exchanges)的概念.

交换机有几种类型:direct, topic, headers 和fanout,

可以为交换机命名,还有一种没有命名的交换机,上几章的消息都是发布到没有命名的交换机.

channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);


感觉交换机主要控制消息的投递方式.

临时队列:

可以通过创建队列的方式对消息的存储等方式进行管理.

var queueName = channel.QueueDeclare().QueueName;


绑定:

最后通过绑定的方式把交换机和队列进行关联.

channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");


publish:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RMQ_Publish
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");

var message = GetMessage(args);
int i = 100;
while (true)
{
var body = Encoding.UTF8.GetBytes(message + ":" + i.ToString());
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);

if (i-- == 0)
i = 100;
}
}

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}

private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
}
}


subscribe:

var factory = new ConnectionFactory() { HostName = "your host name", UserName = "wc", Password = "wc" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");

var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

Console.WriteLine(" [*] Waiting for logs.");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);

Console.WriteLine(" [x] {0}", message);
Thread.Sleep(100);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: