您的位置:首页 > 其它

RabbitMQ消息的持久化

2016-02-19 22:51 459 查看
RabbitMQ消息持久化需要将消息和队列都持久化

队列持久化

//为Channel定义queue的属性,queueName为queue名称  第二个参数持久化标志,为true表示持久化
channel.queueDeclare(queueName, true, false,false,null);
消息持久化

/**
* 测试条件:1、在消息队列持久化的前提下2、接收消息方设置接收方式为手动接收,并不对接收消息进行确认
* 不采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息接收不到
* 采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息依然可以接收到消息,说明消息被持久化
*/
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
发送端

package cn.rabbitmq.disk;

import java.io.IOException;

import cn.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;

public class SendTest {

private final static String EXCHANGE_NAME = "logs1";

public static void main(String[] args) throws IOException {
/**
* 创建连接连接到MabbitMQ
*/
Connection connection = ConnectionUtil.getConnection();
// 创建一个频道
Channel channel = connection.createChannel();

String queueName = "queue01";
//为Channel定义queue的属性,queueName为queue名称 第二个参数持久化标志,为true表示持久化
channel.queueDeclare(queueName, true, false,false,null);
String msg = "Hello World!";
//发送消息
/** * 测试条件:1、在消息队列持久化的前提下2、接收消息方设置接收方式为手动接收,并不对接收消息进行确认 * 不采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息接收不到 * 采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息依然可以接收到消息,说明消息被持久化 */ channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// channel.basicPublish("", queueName, null, msg.getBytes());
System.out.println("send message["+msg+"] to "+queueName+"success!");
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
接收端

package cn.rabbitmq.disk;

import cn.rabbitmq.util.ConnectionUtil;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class RecTest {
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
// 创建一个频道
Channel channel = connection.createChannel();

String queueName = "queue01";
channel.queueDeclare(queueName, true, false, false, null);
//以上部分和sender一样
//配置好获取消息得方式
QueueingConsumer consumer =  new QueueingConsumer(channel);
channel.basicConsume(queueName, false,consumer);
//循环获取消息
while(true){
//获取消息,如果没有消息,这一步将会一直阻塞
Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
//确认消息,已经收到
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("received message["+msg+"] from "+queueName);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: