ActiveMq持久订阅小例子
2017-04-16 11:45
218 查看
public class ProducerTopic { public static void main(String[] args) throws JMSException { String user = ActiveMQConnectionFactory.DEFAULT_USER; String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; String brokerUrl = "tcp://hadoop2:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl); /** * 创建连接并开启 */ Connection connection = connectionFactory.createConnection(); /** * 创建session * 参数1:不启动事务 * 参数2:签收模式为自动签收 */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /** * 创建topic或者queue */ Topic topic_2 = session.createTopic("topic_2"); /** * 创建生产者 */ MessageProducer producer = session.createProducer(topic_2); //此生产者生产的数据持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); /** * 注意,一定要在设置了持久化之后再启动connection */ connection.start(); /** * 生产数据 */ for (int i = 0; i < 10; i++) { TextMessage textMessage = session.createTextMessage(); textMessage.setText("hello java " + i); producer.send(textMessage); } /** * 释放连接 */ connection.close(); } }
public class ConsumerTopic { public static void main(String[] args) { String user = ActiveMQConnectionFactory.DEFAULT_USER; String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD; String brokerUrl = "tcp://hadoop2:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, brokerUrl); Connection connection = null; try { /** * 创建连接并开启,注册一个客户号,让broker知道是谁订阅了。 * 需要在producer发消息之前,先运行一下此程序,目的是注册订阅信息,然后就可以关闭程序 * 这样就可以做到再次启动时,收到离线的消息。 */ connection = connectionFactory.createConnection(); connection.setClientID("c1"); /** * 创建session * 参数1:不启动事务 * 参数2:签收模式为自动签收 aa8b */ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /** * 创建topic或者queue */ Topic topic_2 = session.createTopic("topic_2"); //对于topic_2,建立持久订阅,名字随便起 TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic_2, "somebody"); connection.start(); /** * 消费数据 */ while (true) { TextMessage textMessage = (TextMessage) durableSubscriber.receive(); System.out.println("消费消息:" + textMessage.getText()); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
相关文章推荐
- ActiveMQ 中的持久订阅配置
- ActiveMQ持久订阅设置
- activemq 发布者/订阅 springmvc maven 例子[代码参考]
- 如何实现ActiveMQ的Topic的持久订阅。
- 如何实现ActiveMq的Topic的持久订阅
- ActiveMQ之Topic的持久订阅
- ActiveMQ - 持久化消息与持久主题订阅
- 如何实现ActiveMQ的Topic的持久订阅。
- 如何实现ActiveMq的Topic的持久订阅
- ActiveMQ发布订阅模式
- JMS Activemq实战例子demo
- spring jms同时使用queue和持久topic订阅
- java JMS 之 ActiveMQ 简单例子
- 理解面向消息中间件及JMS 以及 ActiveMQ例子
- [置顶] 理解JMS规范中的持久订阅和非持久订阅
- ActiveMQ-1.收发消息之最简例子
- ActiveMQ使用spring JmsTemplate生成和订阅消息(二)
- Spring整合activeMq(二):发布订阅模式
- MQTT下ActiveMQ的消息持久化
- activeMQ消息详解(续) 订阅(主题)消息(消息持久化)