您的位置:首页 > 其它

.Net下RabbitMQ的使用(2) -- 发送接收消息

2017-05-07 21:22 302 查看
在安装和配置完成RabbitMQ之后,我们就可以尝试做一个最简单的例子:发送和接收消息。

 

我们先来看客户端也就是发送者的代码:

 

public class RabbitClient

{

//定义连接工厂

ConnectionFactory factory = new ConnectionFactory();

 

public RabbitClient()

{

//指定要连接的RabbitMQ服务地址

factory.HostName = "localhost";

}

 

public void Send()

{

//定义要发送的数据

RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "this is a 请求。" };

 

//创建一个 AMQP 连接

using (IConnection connection = factory.CreateConnection())

{

using (IModel channel = connection.CreateModel())

{

//在MQ上定义一个队列

channel.QueueDeclare("esbtest.rmq.consoleserver", false, false, false, null);

 

//序列化消息对象,RabbitMQ并不支持复杂对象的序列化,所以对于自定义的类型需要自己序列化

XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));

using (MemoryStream ms = new MemoryStream())

{

xs.Serialize(ms, message);

byte[] bytes = ms.ToArray();

//指定发送的路由,通过默认的exchange直接发送到指定的队列中。

channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);

}

 

Console.WriteLine(string.Format("Request Message Sent, Id:{0}, Message:{1}", message.MessageId, message.Message));

}

}

}

}


 

在方法

channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);


中的第一个参数是需要输入一个exchange。在RabbitMQ中,所有的消息都必须要通过exchange发送到各个queue里面去。发送者发送消息,其实也就是把消息放到exchange中去。而exchange知道应该把消息放到哪里去。在这个方法中,我们没有输入exchange的名称,只是定义了一个空的echange,而在第二个参数routeKey中输入了我们目标队列的名称。RabbitMQ会帮我定义一个默认的exchange,这个exchange会把消息直接投递到

我们输入的队列中,这样服务端只需要直接去这个定义了的队列中获取消息就可以了。

 

服务端的代码:

 

public class RabbitServer

{

ConnectionFactory factory = null;

 

public void Listen()

{

factory = new ConnectionFactory();

factory.HostName = "localhost";

 

using (IConnection connection = factory.CreateConnection())

{

using (IModel channel = connection.CreateModel())

{

//在MQ上定义一个队列,如果名称相同不会重复创建

channel.QueueDeclare("esbtest.rmq.consoleserver", false, false, false, null);

 

Console.WriteLine("Listening...");

 

//在队列上定义一个消费者

QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);

channel.BasicConsume("esbtest.rmq.consoleserver", true, consumer);

 

while (true)

{

//阻塞函数,获取队列中的消息

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();

 

byte[] body = ea.Body;

 

    XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));

using (MemoryStream ms = new MemoryStream(body))

{

RequestMessage message = (RequestMessage)xs.Deserialize(ms);

Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);

}

}

}

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: