JMS学习六(ActiveMQ Topic之持久订阅)
2017-03-09 13:54
281 查看
上篇文章中Topic(主题)传送模型可以有两种订阅模式即持久订阅和非持久订阅,废话少说看代码:
一、非持久订阅
1、消息生产者
这个非持久的订阅是不是和第二篇中的一样啊,是的都是一样的。
二、持久订阅
持久订阅的前提是要发送持久化消息,将消息持久化到磁盘,一边之后再读取。
1、消息生产者
2、消息消费者
/************************ 持久订阅****************************/
1、消息生产者
2、消息消费者
上面是发布订阅传送模型的非持久化订阅和持久化订阅的demo。
非持久化订阅没有什么硬性的要求,但持久化订阅是有硬性要求的即:
(1)、发送的消息是持久化的。
(2)、activemq区分消费者是通过clientID和订阅主题名称来区分的。
(3)、按照订阅主题名称和给连接设置的id(ClientID)来创建一个消息消费者对象:consumer = session.createDurableSubscriber(topic, clientid);
(4)、订阅主题名称和设置的ClientID组合起来必须是唯一的。
(5)、使用相同的“clientID”和主题名称,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
参考文档:http://blog.csdn.net/zhu_tianwei/article/details/46303347
一、非持久订阅
1、消息生产者
package mqtest2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer3 { public static void main(String[] args) throws Exception { // 创建一个JMS connection factory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://127.0.0.1:61616"); // 通过connection factory来创建JMS connection Connection connection = connectionFactory.createConnection(); // 启动JMS connection connection.start(); // 通过connection创建JMS session Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建JMS destination Destination destination = session.createTopic("topic-test3"); // 创建JMS producer MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < 5; i++) { TextMessage message = session.createTextMessage("message: " + i); // 发送message producer.send(message); } // 关闭所有的JMS资源 session.close(); connection.close(); } }2、消息消费者
package mqtest2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive3 { public static void main(String[] args) throws Exception { // 创建一个JMS connection factory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://127.0.0.1:61616"); // 通过connection factory来创建JMS connection Connection connection = connectionFactory.createConnection(); // 启动JMS connection connection.start(); // 通过connection创建JMS session Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建JMS destination Topic destination = session.createTopic("topic-test3"); // 创建JMS consumer MessageConsumer consumer = session.createConsumer(destination); // 设置监听 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { message = (TextMessage) message; try { String value = ((TextMessage) message).getText(); System.out.println("value2: " + value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }
这个非持久的订阅是不是和第二篇中的一样啊,是的都是一样的。
二、持久订阅
持久订阅的前提是要发送持久化消息,将消息持久化到磁盘,一边之后再读取。
1、消息生产者
package mqtest2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producter { public static void main(String[] args) throws Exception { //创建一个JMS connection factory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://127.0.0.1:61616"); //通过connection factory来创建JMS connection Connection connection = connectionFactory.createConnection(); //通过connection创建JMS session Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //创建JMS destination Destination destination = session.createTopic("PersistenceTopic"); //创建JMS producer MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); //启动JMS connection connection.start(); for(int i = 0;i < 5;i++){ TextMessage message = session.createTextMessage("message"+i); //发送message producer.send(message); } //关闭所有的JMS资源 session.close(); connection.close(); } }
2、消息消费者
package mqtest2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive1 { public static void main(String[] args) throws Exception { // 创建一个JMS connection factory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", "tcp://127.0.0.1:61616"); // 通过connection factory来创建JMS connection Connection connection = connectionFactory.createConnection(); // 设置一个标记id connection.setClientID("TT"); // 启动JMS connection connection.start(); // 通过connection创建JMS session Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建JMS destination Topic destination = session.createTopic("PersistenceTopic"); // 创建JMS consumer 按照目的地和唯一clientId标示。 // TopicSubscriber是MessageConsumer的子接口 TopicSubscriber ts = session.createDurableSubscriber(destination, "TT"); // 设置监听 ts.setMessageListener(new MessageListener() { public void onMessage(Message message) { message = (TextMessage) message; try { String value = ((TextMessage) message).getText(); System.out.println("value2: " + value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }
/************************ 持久订阅****************************/
1、消息生产者
package mqtest2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Producter2 { public static void main(String[] args) { String name = "topic_test2"; ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Topic topic = null; MessageProducer producter = null; TextMessage textMessage = null; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(name); producter = session.createProducer(topic); // 持久化消息 producter.setDeliveryMode(DeliveryMode.PERSISTENT); // 消息对象 textMessage = session.createTextMessage("one test!"); // 发送消息 producter.send(textMessage); System.out.println("发送结束!"); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2、消息消费者
package mqtest2; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive2 { public static void main(String[] args) { String clientid = "producter2"; String name = "topic_test2"; ActiveMQConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Topic topic = null; MessageConsumer consumer = null; try { connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); connection.setClientID(clientid); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); topic = session.createTopic(name); consumer = session.createDurableSubscriber(topic, clientid); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { message = (TextMessage) message; try { String value = ((TextMessage) message).getText(); System.out.println("value2: " + value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
上面是发布订阅传送模型的非持久化订阅和持久化订阅的demo。
非持久化订阅没有什么硬性的要求,但持久化订阅是有硬性要求的即:
(1)、发送的消息是持久化的。
(2)、activemq区分消费者是通过clientID和订阅主题名称来区分的。
(3)、按照订阅主题名称和给连接设置的id(ClientID)来创建一个消息消费者对象:consumer = session.createDurableSubscriber(topic, clientid);
(4)、订阅主题名称和设置的ClientID组合起来必须是唯一的。
(5)、使用相同的“clientID”和主题名称,则认为是同一个消费者。两个程序使用相同的“clientID”,则同时只能有一个连接到activemq,第二个连接的会报错。
参考文档:http://blog.csdn.net/zhu_tianwei/article/details/46303347
相关文章推荐
- 深入浅出JMS(五)--ActiveMQ Topic发布订阅消息
- spring+activemq 发送10W消息报端口被占用的异常分析以及topic持久化订阅
- ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
- Spring+ActiveMQ消息持久化,Topic持久化订阅
- ActiveMQ Topic发布订阅消息
- ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
- ActiveMq NON_PERSISTENT与PERSISTENT以及 durable subscription(持久订阅)的理解
- JMS学习十一(Spring+ActiveMQ消息持久化,Topic持久化订阅)
- Spring+ActiveMQ实现消息收发和订阅
- 测试发送 ActiveMq topic消息
- springboot activemq 2 持久化消息 与 持久化订阅
- ActiveMQ VirtualTopic
- JMS 服务器ActiveMQ Queue和Topic区别
- ActiveMQ+MQTT协议 实现Android推送(根据订阅主题可实现点对点、集群推送)
- ActiveMQ Queue和Topic
- 关于 Jms Topic 持久订阅
- Camel ActiveMQ topic route with jms selector
- JMS 服务器ActiveMQ Queue和Topic区别
- spring jms同时使用queue和持久topic订阅
- ActiveMQ Spring 整合持久化到数据库的实现