您的位置:首页 > 其它

RabbitMQ入门-发布订阅模式

2018-02-26 11:37 435 查看
兔子的Publish/Subscribe是这样的:



有个生产者PX代表交换机,交换机绑定队列,消费者从队列中取得消息。每次有消息,先发到交换机中,然后由交换机负责发送到它已知的队列中。

生产者代码:

package com.example.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
*
* 4种交换类型
* There are a few exchange types available: direct, topic, headers and fanout.
* 扇出交换:将收到的消息广播到它所知道的所有队列里
*/
public class PSSend {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
factory.setHost("localhost");
Connection connection = factory.newConnection();        // 获取连接
Channel channel = connection.createChannel();

// 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略
// 第一个参数为交换的名字,第二个为交换的类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String msg = "发布订阅";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("PS-Send:" + msg);

channel.close();
connection.close();

}

}


消费者:

package com.example.demo;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
*
*
* There are a few exchange types available: direct, topic, headers and fanout.
* 扇出交换:将收到的消息广播到它所知道的所有队列里
*/
public class PSReceive {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();    // 连接工厂
factory.setHost("localhost");
Connection connection = factory.newConnection();        // 获取连接
Channel channel = connection.createChannel();

// 当我们发送时,需要一个路由密钥,但是对于扇出交换,他的值将被忽略
// 第一个参数为交换的名字,第二个为交换的类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 当声明队列,不加任何参数,产生的将是一个临时队列,getQueue返回的是队列名称
String queueA = channel.queueDeclare().getQueue();
//String queueB = channel.queueDeclare().getQueue();
System.out.println("临时队列:" + queueA);

// 下面绑定交换与队列
channel.queueBind(queueA, EXCHANGE_NAME, "");
//channel.queueBind(queueB, EXCHANGE_NAME, "");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String recv = new String(body, "UTF-8");
System.out.println("PS-Receive:" + recv);
}
};

channel.basicConsume(queueA, true, consumer);
//channel.basicConsume(queueB, true, consumer);

}

}


启动消费者和生产者,控制台打印



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: