ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
2017-02-03 17:00
309 查看
转载至:http://quentinxxz.iteye.com/blog/2113458
实验一:
Java代码
public class Producer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "TOOL.DEFAULT";
System.out.println(user +" "+ password+" "+url+" "+subject+" ");
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user,password,url);
try {
Connection connection = contectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
for(int i = 0;i<=20;i++){
MapMessage message = session.createMapMessage();
message.setLong("date", new Date().getTime());
Thread.sleep(5000);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message);
System.out.println("—SendMessge:"+new Date());
}
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
此处显式指明DeliveryMode为NON_PERSISENT
Java代码
public class Consumer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "TOOL.DEFAULT";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageConsumer message = session.createConsumer(destination);
message.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
MapMessage message = (MapMessage)msg;
try {
System.out.println("--Receive:"+new Date(message.getLong("date")));
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(30000);
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ActiveMq Broker的配置都为默认。
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束,之后再启动Consumer,20条Message正常接收。
可见PERSISTENT与NON_PERSISTENT,并不是指Broker会不会在Consumer未连接的情况为其存储Message
实验二:
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束。关闭ActiveMq Broker服务,然后重启。之后再启动Consumer。
Consumer没有接收到任何消息。
实验三:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
在Producer中显式指明DeliveryMode为PERSISENT
实验步骤与实验二相同,但是此次Consumer收到了来自Producer的消息。
总结:
Persistent 用来指定JMS Provider对消息进行持久化操作,以免Provider fail的时候,丢失Message.
NON_Persistent 方式下的JMS Provider不会对消进宪持久化,但上述实验一可知,Consumer还是会收到Message,可见JMS Provider会将相应的消息存在内存中,当Consumer连接上时,再发送过去,但在Provider fail的时候,Message会丢失。
事实上ActiveMq提供了多种消息持久化方式,包括AMQ、KahaDB、JDBC、LevelDB。从5.4版本之后KahaDB做为默认的持久化方式。
以下网上摘的:
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durablesubscription),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
Topic 主题由JMS Provider 管理,主题由主题名识别,客户端可以通过JNDI接口用主题名得到一个主题对象。
实验一:
Java代码
public class Producer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "TOOL.DEFAULT";
System.out.println(user +" "+ password+" "+url+" "+subject+" ");
ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user,password,url);
try {
Connection connection = contectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
for(int i = 0;i<=20;i++){
MapMessage message = session.createMapMessage();
message.setLong("date", new Date().getTime());
Thread.sleep(5000);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(message);
System.out.println("—SendMessge:"+new Date());
}
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意:producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
此处显式指明DeliveryMode为NON_PERSISENT
Java代码
public class Consumer {
public static void main(String[] args) {
String user = ActiveMQConnection.DEFAULT_USER;
String password = ActiveMQConnection.DEFAULT_PASSWORD;
String url = ActiveMQConnection.DEFAULT_BROKER_URL;
String subject = "TOOL.DEFAULT";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
MessageConsumer message = session.createConsumer(destination);
message.setMessageListener(new MessageListener(){
public void onMessage(Message msg) {
MapMessage message = (MapMessage)msg;
try {
System.out.println("--Receive:"+new Date(message.getLong("date")));
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(30000);
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ActiveMq Broker的配置都为默认。
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束,之后再启动Consumer,20条Message正常接收。
可见PERSISTENT与NON_PERSISTENT,并不是指Broker会不会在Consumer未连接的情况为其存储Message
实验二:
先启动Broker,再启动Producer,等待Producer的20条Message发送完毕,Prducer程序运行结束。关闭ActiveMq Broker服务,然后重启。之后再启动Consumer。
Consumer没有接收到任何消息。
实验三:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
在Producer中显式指明DeliveryMode为PERSISENT
实验步骤与实验二相同,但是此次Consumer收到了来自Producer的消息。
总结:
Persistent 用来指定JMS Provider对消息进行持久化操作,以免Provider fail的时候,丢失Message.
NON_Persistent 方式下的JMS Provider不会对消进宪持久化,但上述实验一可知,Consumer还是会收到Message,可见JMS Provider会将相应的消息存在内存中,当Consumer连接上时,再发送过去,但在Provider fail的时候,Message会丢失。
事实上ActiveMq提供了多种消息持久化方式,包括AMQ、KahaDB、JDBC、LevelDB。从5.4版本之后KahaDB做为默认的持久化方式。
以下网上摘的:
消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durablesubscription),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID得到所有当自己处于离线时发送到主题的消息。
Topic 主题由JMS Provider 管理,主题由主题名识别,客户端可以通过JNDI接口用主题名得到一个主题对象。
消息发送端 | 消息接收端 | 可靠性及因素 |
PERSISTENT | queue receiver/durable subscriber | 消费一次且仅消费一次。可靠性最好,但是占用服务器资源比较多。 |
PERSISTENT | non-durable subscriber | 最多消费一次。这是由于non-durable subscriber决定的,如果消费端宕机或其他问题导致与JMS服务器断开连接,等下次再联上JMS服务器时的一系列消息,不为之保留。 |
NON_PERSISTENT | queue receiver/durable subscriber | 最多消费一次。这是由于服务器的宕机会造成消息丢失 |
NON_PERSISTENT | non-durable subscriber | 最多消费一次。这是由于服务器的宕机造成消息丢失,也可能是由于non-durable subscriber的性质所决定 |
相关文章推荐
- ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
- ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
- spring+activemq 发送10W消息报端口被占用的异常分析以及topic持久化订阅
- Active MQ non-persistent Message Performance and Reliability Test
- JMS学习六(ActiveMQ Topic之持久订阅)
- ActiveMQ JMS non-Spring Snippet
- 比较C#的委托与C语言的函数指针,以及用流程图来理解C#中事件,发布与订阅的逻辑
- MQ、JMS以及ActiveMQ 关系的理解
- ActiveMQ Master Slave配置以及示例
- MQ、JMS以及ActiveMQ 关系的理解
- Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用(转)
- Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用
- 理解JMS规范中的持久订阅和非持久订阅
- 深入浅出JMS ActiveMQ简单介绍以及安装
- 深入浅出JMS ActiveMQ简单介绍以及安装
- MQ、JMS以及ActiveMQ 关系的理解
- 理解JMS规范中的持久订阅和非持久订阅
- MQ、JMS以及ActiveMQ 关系的理解
- 理解JMS规范中的持久订阅和非持久订阅
- MQ、JMS以及ActiveMQ 关系的理解