六.消息的交换:direct类型的Exchange(通过消息的routing key比较queue的key)
2017-07-10 23:09
295 查看
根据routingKey匹配消息到符合的消费者消费消息
例:消费者1只消费info级别的日志,消费者2即消费info级别也消费error级别
生产者和消费者的pom.xml和上一章一样
一.生产者Producer
1.发送Exchange类型为direct的消息的类:LogReceiveDirect.java
2.启动测试的main方法:ExchangeDirectMain.java
二.只消费info级别的Consumer
1.消费消息的类:LogReceiveDirect.java
2.启动监听的main方法:ExchangeDirectMain.java
三.同时消费info和error级别消息的消费者
1.消费消息的类:LogReceiveDirect.java
2.启动监听的main方法:ExchangeDirectMain.java
这时,提供者的info级别日志会被消费者1和消费者2同时消费,error级别的日志只会被消费者2消费
例:消费者1只消费info级别的日志,消费者2即消费info级别也消费error级别
生产者和消费者的pom.xml和上一章一样
一.生产者Producer
1.发送Exchange类型为direct的消息的类:LogReceiveDirect.java
package com.rabbit.exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class LogSenderDirect { private Logger logger = LoggerFactory.getLogger(LogSenderFanout.class); //ConnectionFactory和Connection在正式开发时需要设置成单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogSenderDirect(){ super(); try { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connection = connectionFactory.newConnection(); channel = connection.createChannel(); } catch (Exception e) { logger.error("获取连接时出错..."); } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接时异常..."); return false; } return true; } /** * 发送消息到交换中心 */ public void sendMessage(String message,String routingKey){ try { //声明一个exchange,名字为logs,类型为direct channel.exchangeDeclare("logs", "direct"); //发布消息到exchange上 /** * 1.指定exchange的名字 * 2.direct类型 * 3.null... * 3.发送的消息 */ channel.basicPublish("logs", routingKey, null, message.getBytes()); logger.debug("发送direct类型的消息"+message+"到exchange交换中心."); } catch (Exception e) { logger.error("消息发送失败:"+e); } } }
2.启动测试的main方法:ExchangeDirectMain.java
package com.rabbit.main; import com.rabbit.exchange.LogSenderDirect; public class ExchangeDirectMain { public static void main(String[] args) throws InterruptedException { LogSenderDirect logSender = new LogSenderDirect(); //轮流每一秒发送info和error的消息(让消费者1接受info和error级别消息,消费者2只接受info级别消息) int i = 0; while (true) { if(i%2 == 0){ logSender.sendMessage("hello tiglle"+i+":info","info"); }else{ logSender.sendMessage("hello tiglle"+i+":error","error"); } Thread.sleep(1000); i++; } } }
二.只消费info级别的Consumer
1.消费消息的类:LogReceiveDirect.java
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveDirect { private Logger logger = LoggerFactory.getLogger(LogReceiveDirect.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveDirect(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "direct"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心,只接受info级别日志 /** * 1.列队名称 * 2.交换中心的名称 * 3.routingKey和生产者发布消息的时候指定的一样 */ channel.queueBind(queueName, "logs", "info"); //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是打印日志的消费者:"+message); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }
2.启动监听的main方法:ExchangeDirectMain.java
package com.rabbit.main; import com.rabbit.exchange.LogReceiveDirect; public class ExchangeDirectMain { public static void main(String[] args) { LogReceiveDirect logReceive = new LogReceiveDirect(); logReceive.messageReceive(); } }
三.同时消费info和error级别消息的消费者
1.消费消息的类:LogReceiveDirect.java
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveDirect { private Logger logger = LoggerFactory.getLogger(LogReceiveDirect.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveDirect(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "direct"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心,绑定两个routingKey(同时接受info和error级别日志),不会覆盖 /** * 1.列队名称 * 2.交换中心的名称 * 3.routingKey和生产者发布消息的时候指定的一样 */ channel.queueBind(queueName, "logs", "info"); channel.queueBind(queueName, "logs", "error"); //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是写硬盘的消费者:"+message); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }
2.启动监听的main方法:ExchangeDirectMain.java
package com.rabbit.main; import com.rabbit.exchange.LogReceiveDirect; public class ExchangeDirectMain { public static void main(String[] args) { LogReceiveDirect logReceive = new LogReceiveDirect(); logReceive.messageReceive(); } }
这时,提供者的info级别日志会被消费者1和消费者2同时消费,error级别的日志只会被消费者2消费
相关文章推荐
- 七.消息的交换:topic类型的Exchange(消费通过#或者*通配提供者的消息)
- 五.消息的交换:fanout类型的Exchange(广播消息)
- 通过实例分析WCF Duplex消息交换
- 如何通过发送到Soft Key Bar上的消息捕获一些硬按键
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- RabbitMQ四种Exchange类型之Direct (Java)
- 怎样在Redis通过StackExchange.Redis 存储集合类型List
- 当您尝试通过使用 FrontPage 2003 或当您尝试导出 Web 部件编辑门户收到是类型 Microsoft.SharePoint.SoapServer.SoapServerException " Soap:Server 异常 " 消息
- [转]RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- rabbitmq direct、fanout、topic 三种Exchange java 代码比较
- 关于值类型、引用类型和字符串类型的比较问题!通过实例来说明!
- RabbitMQ Exchange Queue RoutingKey BindingKey解析
- 通过实例分析WCF Duplex消息交换(转载)
- 通过void指针实现跨类型变量交换函数
- JMS消息类型模型[queue和topic的区别]
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- 通过实例分析WCF Duplex消息交换 - Bruce Zhang's Blog - 博客园
- KeyInputQueue:取消息(到消息队列) InputDispatcherThread:分发消息
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- DH密钥交换(Diffie–Hellman key exchange)算法笔记