您的位置:首页 > 其它

RabbitMQ .NET Client 实战实验

2016-05-24 11:57 281 查看
  由于公司业务需求,最近想上RabbitMQ,之前我研究了一段时间微软的MSMQ。开源队列有很多,各有优劣。就先拿RabbitMQ练练手吧。本篇着重代码部分,至于怎么安装,怎么配置不在赘述。而且代码是在RabbitMQ.NET Client 类库基础上实现。

  假设阅读本文的人已经安装好RabbitMQ并且做了相应的用户配置。而且项目中已经从nuget安装了rabbitmq.client.dll.我们开始做一个简单的队列发送和接收消息。

将需要配置的东西放在配置文件里,例如主机地址,端口,用户名,密码等。

实现消息发送端:Product

实现消息接收端:Customer

Demo测试

  将以下内容作为可配置部分放在配置文件中

<appSettings>
<!--RabbitMQ-->
<add key="RabbitMQ_HostUri" value="amqp://192.168.1.119:5672/"/>
<add key="RabbitMQ_HostName" value="192.168.1.119"/>
<add key="RabbitMQ_UserName" value="test_user"/>
<add key="RabbitMQ_Password" value="123456"/>
<add key="RabbitMQ_VirtualHost" value="ms_mq"/>
</appSettings>


  由于只是对RabbitMQ.Client.dll中的又一次封装,所以代码不过多解释,其中要注意的就是某些配置问题,例如是否持久化,消息处理模式是怎么样的等等。

  首先我们创建一个连接工厂:

public ConnectionFactory CreateFactory()
{
if (_factory == null) {

const ushort heartbeat = 0;
//主机地址
Uri uri = new Uri(RabbitMQConfig.HostUri);

_factory = new ConnectionFactory();
//_factory.HostName = RabbitMQConfig.HostName;
//用户名
_factory.UserName = RabbitMQConfig.UserName;
//密码
_factory.Password = RabbitMQConfig.PassWord;
//虚拟主机名
_factory.VirtualHost = RabbitMQConfig.VirtualHost;
//连接终端
_factory.Endpoint = new AmqpTcpEndpoint(uri);

_factory.RequestedHeartbeat = heartbeat;
//自动重连
_factory.AutomaticRecoveryEnabled = true;
}
return _factory;
}


  一个简单的消息发布:(对代码研究不够透彻,只能一切从简~~)

public void Publish(string message, string queueName=null)
{
if (queueName == null) {
queueName = _queueName;
}

var factory = RabbitMQFactory.Instance.CreateFactory();
using (var connection = factory.CreateConnection())
{
using (var model = connection.CreateModel())
{
//消息持久化,防止丢失
model.QueueDeclare(queueName, RabbitMQConfig.IsDurable, false, false, null);
var properties = model.CreateBasicProperties();
properties.Persistent = RabbitMQConfig.IsDurable;
properties.DeliveryMode = 2;

//消息转换为二进制
var msgBody = Encoding.UTF8.GetBytes(message);
//消息发出到队列
model.BasicPublish("", queueName, properties, msgBody);
}
}
}


  消息接收:

public void Consume() {
var factory = RabbitMQFactory.Instance.CreateFactory();

var connection = factory.CreateConnection();

connection.ConnectionShutdown += Connection_ConnectionShutdown;

ListenChannel = connection.CreateModel();

bool autoDeleteMessage = false;
var queue = ListenChannel.QueueDeclare(_queueName, RabbitMQConfig.IsDurable, false, false, null);

//公平分发,不要同一时间给一个工作者发送多于一个消息
ListenChannel.BasicQos(0, 1, false);
//创建事件驱动的消费者类型,不要用下边的死循环来消费消息
var consumer = new EventingBasicConsumer(ListenChannel);
consumer.Received += Consumer_Received;
//消费消息
ListenChannel.BasicConsume(_queueName, autoDeleteMessage, consumer);
}


  我在Customer中定义了一个 ReceiveMessageCallback Func回调,这里就是当客户端从队列接收到消息之后,怎么处理由客户端来决定

public Func<string, bool> ReceiveMessageCallback { get; set; }


  处理消息:

private void Consumer_Received(object sender, BasicDeliverEventArgs args) {
try {
var body = args.Body;
var message = Encoding.UTF8.GetString(body);
//将消息业务处理交给外部业务
bool result = ReceiveMessageCallback(message);
if (result) {
if (ListenChannel != null && !ListenChannel.IsClosed) {
ListenChannel.BasicAck(args.DeliveryTag, false);
}
}
else {

}

}
catch (Exception ex) {
throw ex;
}
}


  基本代码已经完成,我们写一个测试,消息发送端:

    static void Main(string[] args)
{
var testQueueName = "test";
IMessageProduct product = new MessageProduct(testQueueName);
for (int i = 0; i < 10000; i++)
{
Console.WriteLine("正在发送第" + i + "条消息...");
product.Publish("消息体" + i);
}

Console.Read();
}


  消息接收端:(开多个口接收)

    static void Main(string[] args)
{

Parallel.For(0, RabbitMQConfig.ThreadCount, i =>
{
IMessageCustomer customer = new MessageCustomer("test");
         //开始监听
customer.StartListening();
customer.ReceiveMessageCallback = message =>
{
            //客户端处理消息(打印)
Console.WriteLine("接收到消息:" + message);
return true;
};
});
Console.Read();
}


  打开发送消息端:



  打开消息接收端:



  到此为止,RabbitMQ队列的简单测试就完成了,没有介绍什么新知识,基本就是套DLL中的方法,不过也有很多不合理的地方,如果真正应用到项目中,还需要多加测试和修改。

  DEMO地址:https://github.com/fanpan26/RabbitMQ.NETClient
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: