rabbitmq使用说明--good
2013-12-16 13:32
453 查看
使用说明
1. 概述
rabbitMQ 是 AMQP 用 Erlang 实现的 MQ AMQP 主要是由金融领域的软件专家们贡献的创意,而联合了通讯和软件方面的力量,一起打造出来的规范。只要遵循 AMQP 的协议,任何一种语言都可以开发消息组件乃至中间件本身。我们之前使用的activeMQ是实现了jms接口,只能在java环境使用。我们目前的需求是java和c#互通。符合rabbitmq的使用场景。
2. 基本概念
如上图所示rabbitmq主要包括四部分组成
P代表生产者,C 代表消费者,X exchange 交换器,Q红色的表示队列
P和C都是在客户端,X和Q在服务器端。
发送接收的大概流程是,P发送消息至交换器。
C先声明一个队列,然后将队列和交换器绑定,接着接受消息。
换句话说,P和C互相不知道对方存在。
1. 概述
rabbitMQ 是 AMQP 用 Erlang 实现的 MQ AMQP 主要是由金融领域的软件专家们贡献的创意,而联合了通讯和软件方面的力量,一起打造出来的规范。只要遵循 AMQP 的协议,任何一种语言都可以开发消息组件乃至中间件本身。我们之前使用的activeMQ是实现了jms接口,只能在java环境使用。我们目前的需求是java和c#互通。符合rabbitmq的使用场景。
2. 基本概念
如上图所示rabbitmq主要包括四部分组成
P代表生产者,C 代表消费者,X exchange 交换器,Q红色的表示队列
P和C都是在客户端,X和Q在服务器端。
发送接收的大概流程是,P发送消息至交换器。
C先声明一个队列,然后将队列和交换器绑定,接着接受消息。
换句话说,P和C互相不知道对方存在。
3. 使用说明 3.1 参数定义 private static final String EXCHANGE_NAME = "test_topic"; private static final String HOST = "localhost"; private static final String USER_NAME = "user"; private static final String USER_PWD = "pwd"; 3.2 定义connection和channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(USER_PWD); // factory.setVirtualHost(virtualHost); // factory.setPort(portNumber); connection = factory.newConnection(); channel = connection.createChannel(); 3.3 声明交换器,队列,并绑定 //告诉RabbitMQ同一时间给一个消息给消费者 channel.basicQos(1); //三个参数1.交换器名2.交换器类型 3.是否持久 channel.exchangeDeclare(EXCHANGE_NAME, "topic",true); String queueName = "banggoutest"; //queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, java.util.Map<java.lang.String,java.lang.Object> arguments) //产生一个持久化,非独占,非自动删除的队列 channel.queueDeclare(queueName, true, false, false, null); //绑定队列,交换器,和topickey //channel.queueBind(queueName, EXCHANGE_NAME, "*.error"); //channel.queueBind(queueName, EXCHANGE_NAME, "test.warn"); channel.queueBind(queueName, EXCHANGE_NAME, "mberp"); 3.4 定义消费者 //定义消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //绑定消费者和队列,并设置非自动确认,这样当消息收下后,处理出项问题的时候,msg还在对列里,不会消失。 channel.basicConsume(queueName, false, consumer); 3.5 异步接受消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); //此处可以添加处理消息代码 System.out.println(" 处理完成"); //提交消息处理完成标记给服务器,从队列中删除服务器,并准备接受下一条 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } 3.6 退出程序时 不要忘记关掉connetion以及channel,释放资源 finally { if(channel != null) { try { channel.close(); } catch (Exception ignore) {} } if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } }
相关文章推荐
- 中南大学oj:1352: New Sorting Algorithm
- goroutine背后的系统知识
- RHEL 6.4(i386)编译安装GoldenDict
- 算法之旅,直奔<algorithm>之十 count_if
- 算法之旅,直奔<algorithm>之十一 equal
- 修改hosts翻墙访问Google Drive
- Django:快速搭建简单的Blog
- JW Player6.7Pro专业版(网页视频播放器),自定义Logo和右键菜单,支持MP4、FLV等,支持直播
- Intent的隐藏跳转
- 为什么 Google 大量收购机器人公司
- Sunday algorithm
- Horspool algorithm
- Boyer-Moore algorithm
- Google Interview Preparation
- Knuth-Morris-Pratt algorithm
- 算法之旅,直奔<algorithm>之十 count_if
- goagent新功能个人配置文件proxy.user.ini使用简介
- 文本数据导入HBASE库找不到类com/google/common/collect/Multimap
- 集成libevent,google protobuf的RPC框架
- GoLang之Concurrency多任务独立模式