您的位置:首页 > 产品设计 > UI/UE

六.消息的交换:direct类型的Exchange(通过消息的routing key比较queue的key)

2017-07-10 23:09 295 查看
根据routingKey匹配消息到符合的消费者消费消息

例:消费者1只消费info级别的日志,消费者2即消费info级别也消费error级别

生产者和消费者的pom.xml和上一章一样

一.生产者Producer

1.发送Exchange类型为direct的消息的类:LogReceiveDirect.java

package com.rabbit.exchange;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class LogSenderDirect {

private Logger logger = LoggerFactory.getLogger(LogSenderFanout.class);

//ConnectionFactory和Connection在正式开发时需要设置成单例
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;

/**
* 在构造函数中获取连接
*/
public LogSenderDirect(){
super();
try {
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
logger.error("获取连接时出错...");
}
}

/**
* 关闭连接的方法
*/
public boolean closeAll(){
try {
this.channel.close();
this.connection.close();
} catch (Exception e) {
logger.error("关闭连接时异常...");
return false;
}
return true;
}

/**
* 发送消息到交换中心
*/
public void sendMessage(String message,String routingKey){
try {
//声明一个exchange,名字为logs,类型为direct
channel.exchangeDeclare("logs", "direct");
//发布消息到exchange上
/**
* 1.指定exchange的名字
* 2.direct类型
* 3.null...
* 3.发送的消息
*/
channel.basicPublish("logs", routingKey, null, message.getBytes());
logger.debug("发送direct类型的消息"+message+"到exchange交换中心.");
} catch (Exception e) {
logger.error("消息发送失败:"+e);
}
}

}


2.启动测试的main方法:ExchangeDirectMain.java

package com.rabbit.main;

import com.rabbit.exchange.LogSenderDirect;

public class ExchangeDirectMain {

public static void main(String[] args) throws InterruptedException {
LogSenderDirect logSender = new LogSenderDirect();
//轮流每一秒发送info和error的消息(让消费者1接受info和error级别消息,消费者2只接受info级别消息)
int i = 0;
while (true) {
if(i%2 == 0){
logSender.sendMessage("hello tiglle"+i+":info","info");
}else{
logSender.sendMessage("hello tiglle"+i+":error","error");
}
Thread.sleep(1000);
i++;
}
}

}


二.只消费info级别的Consumer

1.消费消息的类:LogReceiveDirect.java

package com.rabbit.exchange;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiveDirect {

private Logger logger = LoggerFactory.getLogger(LogReceiveDirect.class);

//正式开发ConnectionFactory和Connection应该设置为单例
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;

/**
* 在构造函数中获取连接
*/
public LogReceiveDirect(){
super();//Objece为其父类...
try {
connectionFactory = new ConnectionFactory();
connection  = connectionFactory.newConnection();
channel = connection.createChannel();
//声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明)
channel.exchangeDeclare("logs", "direct");
} catch (Exception e) {
// TODO: handle exception
}
}

/**
* 关闭连接的方法
*/
public boolean closeAll(){
try {
this.channel.close();
this.connection.close();
} catch (Exception e) {
logger.error("关闭连接异常:"+e);
return false;
}
return true;
}

/**
* 消费消息
*/
public void messageReceive(){
try {
//获取临时列队:自己声明队列是比较麻烦的,
//因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
String queueName = channel.queueDeclare().getQueue();
//把获取的临时列队绑定到logs这个exchange交换中心,只接受info级别日志
/**
* 1.列队名称
* 2.交换中心的名称
* 3.routingKey和生产者发布消息的时候指定的一样
*/
channel.queueBind(queueName, "logs", "info");
//定义一个Consumer消费logs的消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"UTF-8");
logger.debug("我是打印日志的消费者:"+message);
}
};
//自动确认为true,接收到消息后该消息就销毁了
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
logger.error("消费消息时异常:"+e);
}
}
}


2.启动监听的main方法:ExchangeDirectMain.java

package com.rabbit.main;

import com.rabbit.exchange.LogReceiveDirect;

public class ExchangeDirectMain {

public static void main(String[] args) {
LogReceiveDirect logReceive = new LogReceiveDirect();
logReceive.messageReceive();
}

}


三.同时消费info和error级别消息的消费者

1.消费消息的类:LogReceiveDirect.java

package com.rabbit.exchange;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class LogReceiveDirect {

private Logger logger = LoggerFactory.getLogger(LogReceiveDirect.class);

//正式开发ConnectionFactory和Connection应该设置为单例
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;

/**
* 在构造函数中获取连接
*/
public LogReceiveDirect(){
super();//Objece为其父类...
try {
connectionFactory = new ConnectionFactory();
connection  = connectionFactory.newConnection();
channel = connection.createChannel();
//声明exchange,防止生成者没启动的时候,exchange不存在(生成者和消费者总有一个要先声明)
channel.exchangeDeclare("logs", "direct");
} catch (Exception e) {
// TODO: handle exception
}
}

/**
* 关闭连接的方法
*/
public boolean closeAll(){
try {
this.channel.close();
this.connection.close();
} catch (Exception e) {
logger.error("关闭连接异常:"+e);
return false;
}
return true;
}

/**
* 消费消息
*/
public void messageReceive(){
try {
//获取临时列队:自己声明队列是比较麻烦的,
//因此,RabbitMQ提供了简便的获取临时队列的方法,该队列会在连接断开后销毁
String queueName = channel.queueDeclare().getQueue();
//把获取的临时列队绑定到logs这个exchange交换中心,绑定两个routingKey(同时接受info和error级别日志),不会覆盖
/**
* 1.列队名称
* 2.交换中心的名称
* 3.routingKey和生产者发布消息的时候指定的一样
*/
channel.queueBind(queueName, "logs", "info");
channel.queueBind(queueName, "logs", "error");
//定义一个Consumer消费logs的消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
// TODO Auto-generated method stub
String message = new String(body,"UTF-8");
logger.debug("我是写硬盘的消费者:"+message);
}
};
//自动确认为true,接收到消息后该消息就销毁了
channel.basicConsume(queueName, true, consumer);
} catch (Exception e) {
logger.error("消费消息时异常:"+e);
}
}
}


2.启动监听的main方法:ExchangeDirectMain.java

package com.rabbit.main;

import com.rabbit.exchange.LogReceiveDirect;

public class ExchangeDirectMain {

public static void main(String[] args) {
LogReceiveDirect logReceive = new LogReceiveDirect();
logReceive.messageReceive();
}

}


这时,提供者的info级别日志会被消费者1和消费者2同时消费,error级别的日志只会被消费者2消费
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: