RabbitMQ入门-发布订阅模式
2018-02-26 11:37
435 查看
兔子的Publish/Subscribe是这样的:
有个生产者P,X代表交换机,交换机绑定队列,消费者从队列中取得消息。每次有消息,先发到交换机中,然后由交换机负责发送到它已知的队列中。
生产者代码:
消费者:
启动消费者和生产者,控制台打印
有个生产者P,X代表交换机,交换机绑定队列,消费者从队列中取得消息。每次有消息,先发到交换机中,然后由交换机负责发送到它已知的队列中。
生产者代码:
package com.example.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * 4种交换类型 * There are a few exchange types available: direct, topic, headers and fanout. * 扇出交换:将收到的消息广播到它所知道的所有队列里 */ public class PSSend { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); // 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略 // 第一个参数为交换的名字,第二个为交换的类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "发布订阅"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("PS-Send:" + msg); channel.close(); connection.close(); } }
消费者:
package com.example.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * * * There are a few exchange types available: direct, topic, headers and fanout. * 扇出交换:将收到的消息广播到它所知道的所有队列里 */ public class PSReceive { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); // 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略 // 第一个参数为交换的名字,第二个为交换的类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称 String queueA = channel.queueDeclare().getQueue(); //String queueB = channel.queueDeclare().getQueue(); System.out.println("临时队列:" + queueA); // 下面绑定交换与队列 channel.queueBind(queueA, EXCHANGE_NAME, ""); //channel.queueBind(queueB, EXCHANGE_NAME, ""); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("PS-Receive:" + recv); } }; channel.basicConsume(queueA, true, consumer); //channel.basicConsume(queueB, true, consumer); } }
启动消费者和生产者,控制台打印
相关文章推荐
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- 设计模式入门--发布订阅模式
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ五种消息队列学习(四)--发布订阅模式
- Redis入门系列之队列和发布订阅模式
- 转载RabbitMQ入门(3)--发布和订阅
- RabbitMq六种使用模式(3)_订阅发布模式
- .Net下RabbitMQ发布订阅模式实践
- RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)
- RabbitMQ 使用 | 第三篇:发布/订阅模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- 分布式服务框架学习笔记8 ActiveMQ入门2 发布/订阅模式
- RabbitMQ入门(3)——发布/订阅(Publish/Subscribe)
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ入门-消息订阅模式
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)