五.消息的交换:fanout类型的Exchange(广播消息)
2017-07-10 22:56
811 查看
在前面的例子中,每个消息都只对应一个消费者,即使有多个消费者在线,也只会有一个消费者接收并处理一条消息,这是消息中间件的一种常用方式。
另外一种方式,生产者生产一条消息,广播给一个或多个队列,所有订阅了这个队列的消费者,都可以消费这条消息,这就是广播消息订阅。
在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。
生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期–发送给某些队列,或者直接丢弃掉。
RabbitMQ中,有4种类型的Exchange:
direct 通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
默认类型 本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
topic 话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
headers 当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
fanout 分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景
临时队列:
在消费者消费消息时,要指明消费哪个队列的消息(下面的queue),这样就可以让多个消费者同时分享一个队列
所有消费者都需要监听所有的日志消息,因此每个消费者都需要一个单独的队列,不需要和别人分享
消费者只关心最新的消息,连接到RabbitMQ之前的消息不需要关心,因此,每次连接时需要创建一个队列,绑定到相应的exchange上,连接断开后,删除该队列
自己声明队列是比较麻烦的,因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
String queueName = channel.queueDeclare().getQueue();
生产者和消费者的pom.xml和上一章一样
一.生产者:Procedure
1.Exchange为fanout类型的消息发送类:LogSenderFanout.java
2.测试发送消息的Main:ExchangeFanoutMain.java
二.提供者1Consumner(将消息写进硬盘)
1.接收Exchange类型为fanout类型消息的类:LogReceiveFanout.java
测试接收消息的Main方法:ExchangeFanoutMain.java
三消费者2,Consumer(将消息打印日志)
1.接收Exchange类型为fanout类型的消息的类:
2.测试的main方法:
这时,生产者发送一条消息,消费者1和消费者2都能接收到消息并且消费
另外一种方式,生产者生产一条消息,广播给一个或多个队列,所有订阅了这个队列的消费者,都可以消费这条消息,这就是广播消息订阅。
在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。
生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另一侧则是一个或多个队列,由Exchange决定一条消息的生命周期–发送给某些队列,或者直接丢弃掉。
RabbitMQ中,有4种类型的Exchange:
direct 通过消息的routing key比较queue的key,相等则发给该queue,常用于相同应用多实例之间的任务分发
默认类型 本身是一个direct类型的exchange,routing key自动设置为queue name。注意,direct不等于默认类型,默认类型是在queue没有指定exchange时的默认处理方式,发消息时,exchange字段也要相应的填成空字符串“”
topic 话题,通过可配置的规则分发给绑定在该exchange上的队列,通过地理位置推送等场景适用
headers 当分发规则很复杂,用routing key不好表达时适用,忽略routing key,用header取代之,header可以为非字符串,例如Integer或者String
fanout 分发给所有绑定到该exchange上的队列,忽略routing key,适用于MMO游戏、广播、群聊等场景
临时队列:
在消费者消费消息时,要指明消费哪个队列的消息(下面的queue),这样就可以让多个消费者同时分享一个队列
所有消费者都需要监听所有的日志消息,因此每个消费者都需要一个单独的队列,不需要和别人分享
消费者只关心最新的消息,连接到RabbitMQ之前的消息不需要关心,因此,每次连接时需要创建一个队列,绑定到相应的exchange上,连接断开后,删除该队列
自己声明队列是比较麻烦的,因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
String queueName = channel.queueDeclare().getQueue();
生产者和消费者的pom.xml和上一章一样
一.生产者:Procedure
1.Exchange为fanout类型的消息发送类:LogSenderFanout.java
package com.rabbit.exchange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //fanout public class LogSenderFanout { private Logger logger = LoggerFactory.getLogger(LogSenderFanout.class); //ConnectionFactory和Connection在正式开发时需要设置成单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogSenderFanout(){ super(); try { connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connection = connectionFactory.newConnection(); channel = connection.createChannel(); } catch (Exception e) { logger.error("获取连接时出错..."); } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接时异常..."); return false; } return true; } /** * 发送消息到交换中心 */ public void sendMessage(String message){ try { //声明一个exchange,名字为logs,类型为fanout channel.exchangeDeclare("logs", "fanout"); //发布消息到exchange上 /** * 1.指定exchange的名字 * 2.为空,表示忽略routingKey(奖消息发送到所有与此Exchange绑定的列队上) * 3.null... * 3.发送的消息 */ channel.basicPublish("logs", "", null, message.getBytes()); logger.debug("发送fanout类型的消息到exchange交换中心."); } catch (Exception e) { logger.error("消息发送失败"); } } }
2.测试发送消息的Main:ExchangeFanoutMain.java
package com.rabbit.main; import com.rabbit.exchange.LogSenderFanout; public class ExchangeFanoutMain { public static void main(String[] args) throws InterruptedException { LogSenderFanout logSender = new LogSenderFanout(); //每三秒发送一个消息 while (true) { logSender.sendMessage("hello tiglle"); Thread.sleep(3000); } } }
二.提供者1Consumner(将消息写进硬盘)
1.接收Exchange类型为fanout类型消息的类:LogReceiveFanout.java
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveFanout { private Logger logger = LoggerFactory.getLogger(LogReceiveFanout.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveFanout(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "fanout"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心 /** * 1.列队名称 * 2.交换中心的名称 * 3.fanout忽略routingKey,所有为空 */ channel.queueBind(queueName, "logs", ""); //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是写硬盘的消费者:"+message); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }
测试接收消息的Main方法:ExchangeFanoutMain.java
package com.rabbit.main; import com.rabbit.exchange.LogReceiveFanout; public class ExchangeFanoutMain { public static void main(String[] args) { LogReceiveFanout logReceive = new LogReceiveFanout(); logReceive.messageReceive(); } }
三消费者2,Consumer(将消息打印日志)
1.接收Exchange类型为fanout类型的消息的类:
package com.rabbit.exchange; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class LogReceiveFanout { private Logger logger = LoggerFactory.getLogger(LogReceiveFanout.class); //正式开发ConnectionFactory和Connection应该设置为单例 private ConnectionFactory connectionFactory; private Connection connection; private Channel channel; /** * 在构造函数中获取连接 */ public LogReceiveFanout(){ super();//Objece为其父类... try { connectionFactory = new ConnectionFactory(); connection = connectionFactory.newConnection(); channel = connection.createChannel(); //声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明) channel.exchangeDeclare("logs", "fanout"); } catch (Exception e) { // TODO: handle exception } } /** * 关闭连接的方法 */ public boolean closeAll(){ try { this.channel.close(); this.connection.close(); } catch (Exception e) { logger.error("关闭连接异常:"+e); return false; } return true; } /** * 消费消息 */ public void messageReceive(){ try { //获取临时列队:自己声明队列是比较麻烦的, //因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁 String queueName = channel.queueDeclare().getQueue(); //把获取的临时列队绑定到logs这个exchange交换中心 /** * 1.列队名称 * 2.交换中心的名称 * 3.fanout忽略routingKey,所有为空 */ channel.queueBind(queueName, "logs", ""); //定义一个Consumer消费logs的消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body,"UTF-8"); logger.debug("我是打印日志的消费者:"+message); } }; //自动确认为true,接收到消息后该消息就销毁了 channel.basicConsume(queueName, true, consumer); } catch (Exception e) { logger.error("消费消息时异常:"+e); } } }
2.测试的main方法:
package com.rabbit.main; import com.rabbit.exchange.LogReceiveFanout; public class ExchangeFanoutMain { public static void main(String[] args) { LogReceiveFanout logReceive = new LogReceiveFanout(); logReceive.messageReceive(); } }
这时,生产者发送一条消息,消费者1和消费者2都能接收到消息并且消费
相关文章推荐
- RabbitMQ四种Exchange类型之Fanout (Java)
- Exchange-fanout 广播模式
- 六.消息的交换:direct类型的Exchange(通过消息的routing key比较queue的key)
- RabbitMQ Exchange中的fanout类型
- RabbitMQ四种Exchange类型之Fanout (Erlang)
- 七.消息的交换:topic类型的Exchange(消费通过#或者*通配提供者的消息)
- RabbitMQ的Exchange 模式之Fanout(广播模式)
- Android动作广播类别消息类型
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- rabbitmq 交换模式-Fanout
- rabbitmq direct、fanout、topic 三种Exchange java 代码比较
- RabbitMQ三种Exchange模式(fanout,direct,topic)
- exchange 2013 delayed FAN-OUT 的优点
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- rabbitmq direct、fanout、topic 三种Exchange java 代码比较
- RabbitMQ消息处理机制fanout,direct,topic,header
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- RabbitMQ三种Exchange模式(fanout,direct,topic)的特性 -摘自网络
- Android动作广播类别消息类型