RabbitMQ消息的持久化
2016-02-19 22:51
459 查看
RabbitMQ消息持久化需要将消息和队列都持久化
队列持久化
队列持久化
//为Channel定义queue的属性,queueName为queue名称 第二个参数持久化标志,为true表示持久化 channel.queueDeclare(queueName, true, false,false,null);消息持久化
/** * 测试条件:1、在消息队列持久化的前提下2、接收消息方设置接收方式为手动接收,并不对接收消息进行确认 * 不采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息接收不到 * 采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息依然可以接收到消息,说明消息被持久化 */ channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());发送端
package cn.rabbitmq.disk;接收端
import java.io.IOException;
import cn.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class SendTest {
private final static String EXCHANGE_NAME = "logs1";
public static void main(String[] args) throws IOException {
/**
* 创建连接连接到MabbitMQ
*/
Connection connection = ConnectionUtil.getConnection();
// 创建一个频道
Channel channel = connection.createChannel();
String queueName = "queue01";
//为Channel定义queue的属性,queueName为queue名称 第二个参数持久化标志,为true表示持久化
channel.queueDeclare(queueName, true, false,false,null);
String msg = "Hello World!";
//发送消息
/** * 测试条件:1、在消息队列持久化的前提下2、接收消息方设置接收方式为手动接收,并不对接收消息进行确认 * 不采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息接收不到 * 采用消息持久化,重启RabbitMQ服务后,消息队列存在,消息依然可以接收到消息,说明消息被持久化 */ channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// channel.basicPublish("", queueName, null, msg.getBytes());
System.out.println("send message["+msg+"] to "+queueName+"success!");
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
package cn.rabbitmq.disk; import cn.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class RecTest { public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); // 创建一个频道 Channel channel = connection.createChannel(); String queueName = "queue01"; channel.queueDeclare(queueName, true, false, false, null); //以上部分和sender一样 //配置好获取消息得方式 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false,consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); //确认消息,已经收到 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); System.out.println("received message["+msg+"] from "+queueName); } } }
相关文章推荐
- CentOS安装R中看yum、rpm、repo到底有什么关系
- 字典树(hdu1075)
- html5那些事儿
- SDAutoLayout:比masonry更简单易用的自动布局库
- CentOS安装R中看yum、rpm、repo到底有什么关系
- 夺命雷公狗---微信开发23----客服消息接口基础和推送文本
- 最优二叉树之哈夫曼编码
- golang解析json格式
- DDL语句操作表
- VMware Workstation虚机自动开关机
- 黑马程序员——Java设计模式之单例模式
- neuq oj 1043: 谭浩强C语言(第三版)习题5.7 C语言
- 构建ASP.NET MVC5+EF6+EasyUI 1.4.3+Unity4.x注入的后台管理系统(56)-插件---单文件上传与easyui使用fancybox
- 最大流问题的Ford-Fulkerson模板
- 二叉搜索树基本操作
- 关于研究Java中GlassPane的心得
- Android Studio中的EditText控件使用详解
- 查看google chrome和firefox上的cookie
- 定制化Azure站点Java运行环境(4)
- Hotel(线段树合并)