您的位置:首页 > 其它

RabbitMQ的订阅和发布步骤详解

2016-12-15 11:42 411 查看
一、关于RabbitMQ搭建和基本概念这里不做介绍,下面给出实用的参考博客

RabbitMQ基础概念及详细介绍参考文档:http://blog.csdn.net/whycold/article/details/41119807

RabbitMQ入门及环境的搭建:http://m.blog.csdn.net/article/details?id=50487028

RabbitMQ网页控制台开启方式:http://blog.csdn.net/spyiu/article/details/24697221

二、RabbitMQ的发布

(1)发布端的连接方法

发布端的连接只需要创建一个ConnectionFactory然后创建一个连接,然后创建一个频道,声明一个路由器,指定名称、模式、及其是否durable

public int BaseConnection(){
/**
* 创建连接连接到RabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost(IP);
factory.setPort(5672);
factory.setUsername(user);
factory.setPassword(password);

//创建一个连接
try {
connection = factory.newConnection();
} catch (IOException e) {
System.out.println("[x] 请确认输入的IP地址、用户名、密码是否准确!");
return -1;
} catch (Exception e) {
System.out.println("[x] 连接RabbitMQ超时,请重试!");
return -1;
}

//创建一个频道
try {
channel = connection.createChannel();
} catch (IOException e) {
System.out.println("[x] 创建频道出错,请重试!");
return -1;
}

//声明一个路由器,指定名称、模式、及其是否durable
try {
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
} catch (IOException e) {
System.out.println("[x] 路由器声明失败,请重试!");
return -1;
}

System.out.println("[x] 发布消息者成功连接至RabbitMQ !");
return 0;
}

(2)发布端发布消息的方法

发布端发布消息往指定的exchange(路由)发送的时候需要指定一个routingKey,topic类型时当接收端的bindingKey和routingKey相匹配的时候才能接收到订阅的消息

/**
* 发布消息
* @param routingKey routingKey
* @param msg 消息
* @return
*/
public boolean Publish(String routingKey,byte[] msg){
try {
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg);
System.out.println("[x] Sender Sent " + routingKey + " : " + msg);
return true;
} catch (IOException e) {
System.out.println("[y] Sender failed to basic publish.");
return false;
}
}

三、RabbitMQ的订阅
(1)订阅端的连接方法

订阅端的连接在发布端连接的基础上还需要给该exchange绑定

/**
* 创建连接连接到RabbitMQ
* @return 0 means success
* -1 means failure
*/
public int BaseConnection(){
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost(IP);
factory.setPort(5672);
factory.setUsername(user);
factory.setPassword(password);

//创建一个连接
try {
connection = factory.newConnection();
} catch (IOException e) {
System.out.println("[y] 请确认输入的IP地址、用户名、密码是否准确!");
return -1;
} catch (Exception e) {
System.out.println("[y] 连接RabbitMQ超时,请重试!");
return -1;
}

//创建一个频道
try {
channel = connection.createChannel();
} catch (IOException e) {
System.out.println("[y] 创建频道出错,请重试!");
return -1;
}

//声明一个路由器,指定名称、模式、及其是否durable
try {
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE,EXCHANGE_DURABLE);
} catch (IOException e) {
System.out.println("[y] 路由器声明失败,请重试!");
return -1;
}

//指定一个队列,随机队列名,non-durable,exclusive,not auto-delete
// QUEUE_NAME = MqConfig.QUEUE_NAME;

Map<String,Object> args = new HashMap();
args.put("x-message-ttl",message_ttl);
try {
channel.queueDeclare(QUEUE_NAME,QUEUE_DURABLE,QUEUE_EXCLUSIVE,QUEUE_AUTO_DELETE,args);
} catch (IOException e) {
System.out.println("[y] 队列声明失败,请重试!");
}

try {
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,bindingKey);
} catch (IOException e) {
System.out.println("[y] 队列绑定路由关键字 " + bindingKey + " 时出错!");
}

System.out.println("[y] 订阅消息者成功连接至RabbitMQ !");
return 0;
}

(2)订阅端的订阅方法

/**
* 订阅消息函数
*/
public boolean Consume(){
System.out.println("[y] Receiver " + QUEUE_NAME + " Waiting for messages.To exit press CTRL+C.");
//消息订阅
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
team.iOceanPlus.PB.Config.PBConfig rec=team.iOceanPlus.PB.Config.PBConfig.parseFrom(body);
//                rec.getConfigTargetDistribution();
//                rec.getConfigTargetDistribution().getConfigType();
System.out.println(rec.getConfigTargetDistribution(0));
}
};

//消息反馈
try {
channel.basicConsume(QUEUE_NAME,true,consumer);
return true;
} catch (IOException e) {
System.out.println("[y] Receiver "+ QUEUE_NAME + " failed to basic consume." );
return false;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: