您的位置:首页 > 其它

NET下RabbitMQ实践[示例篇]

2016-06-01 00:45 363 查看
上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容。

今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。

首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html

下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:

RabbitMQ.Client.dll //基于的发布订阅消息的功能类
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类

如下图:

public class ProducerMQ
{
public static void InitProducerMQ()
{
Uri uri = new Uri("amqp://10.0.4.85:5672/");
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
bool persistMode = true;
ConnectionFactory cf = new ConnectionFactory();

cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
ch.QueueBind("q1", "ex1", "m1", false, null);
}
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targetBody = b.Body;
targetBody["body"] = "daizhj";
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
}

ch.BasicPublish(exchange, routingKey,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());

}
}
}
}

ConnectionFactory cf = new ConnectionFactory();
//使用前文的配置环境信息
cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);

using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
ch.QueueBind("q1", "ex1", "m1", false, null);
}
....

IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targetBody = b.Body;
targetBody["body"] = "daizhj";
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
}
//简单发布方式
ch.BasicPublish(exchange, routingKey,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());

public class CustmerMq
{
public static int InitCustmerMq()
{
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";

string serverAddress = "10.0.4.85:5672";
ConnectionFactory cf = new ConnectionFactory();
cf.Address = serverAddress;
cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;

using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//普通使用方式BasicGet
//noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
//noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
BasicGetResult res = ch.BasicGet("q1", false/*noAck*/);
if (res != null)
{
bool t = res.Redelivered;
t = true;
Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
ch.BasicAck(res.DeliveryTag, false);
}
else
{
Console.WriteLine("No message!");
}

while (true)
{
BasicGetResult res = ch.BasicGet("q1", false/*noAck*/);
if (res != null)
{
try
{
bool t = res.Redelivered;
t = true;
Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
ch.BasicAck(res.DeliveryTag, false);
}
catch { }
}
else
break;
}

//第二种取法QueueingBasicConsumer基于订阅模式
QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
ch.BasicConsume("q1", false, null, consumer);
while (true)
{
try
{
BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
IBasicProperties props = e.BasicProperties;
byte[] body = e.Body;
Console.WriteLine(System.Text.Encoding.UTF8.GetString(body));
//ch.BasicAck(e.DeliveryTag, true);
ProcessRemainMessage();
}
catch (EndOfStreamException ex)
{
//The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel().
Console.WriteLine(ex.ToString());
break;
}
}

这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: