activeMq Topic使用案例
2017-04-11 10:58
176 查看
Topic实现publish和subscribe语义:
一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
这个例子可以先开启生产者,然后再进行消费。
两个订阅的消费者
一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。但是在消息代理接收到消息时,只有激活订阅的subscriber能够获得消息的一个拷贝。
这个例子可以先开启生产者,然后再进行消费。
/** * */ package activeMQ.topicMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; /** * @author Test * @createDate 2014-6-22上午10:57:35 * @className TopicMsgProducer.java * @useFor */ public class TopicMsgProducer { public void send() { //创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = null; try { //创建连接 conn = factory.createConnection(); conn.start(); //创建会话 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建地点 Topic topic = session.createTopic("topic.textMsg"); //创建生产者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); for(int i=0 ; i<10 ;i++) { TextMessage tmsg = session.createTextMessage(); tmsg.setText("早上你好 "+i); producer.send(tmsg); System.out.println("发送的消息:"+tmsg.getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { } } @Test public void test() { new TopicMsgProducer().send(); } }
两个订阅的消费者
/** * */ package activeMQ.topicMessage; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; /** * @author Test * @createDate 2014-6-22上午10:58:42 * @className TopicMsgConsumer.java * @useFor */ public class TopicMsgConsumer { public void receive() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = null; try { conn = factory.createConnection(); conn.setClientID("T1"); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); //订阅发布模式的 Topic对象 不是Destination Topic topic = session.createTopic("topic.textMsg"); TopicSubscriber subsriber = session.createDurableSubscriber(topic , "T1" ); while (true) { TextMessage tm = (TextMessage)subsriber.receive() ; if (tm == null) { break; } System.out.println("Msg1 Received message: " + tm.getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } @Test public void test() { new TopicMsgConsumer().receive(); } }
/** * */ package activeMQ.topicMessage; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; /** * @author Test * @createDate 2014-6-22上午10:58:42 * @className TopicMsgConsumer.java * @useFor */ public class TopicMsg2Consumer { public void receive() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = null; try { conn = factory.createConnection(); conn.setClientID("T2"); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); // 订阅发布模式的 Topic对象 不是Destination Topic topic = session.createTopic("topic.textMsg"); TopicSubscriber subsriber = session.createDurableSubscriber(topic , "T2" ); while (true) { TextMessage tm = (TextMessage)subsriber.receive() ; if (tm == null) { break; } System.out.println("Msg2 Received message: " + tm.getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } @Test public void test() { new TopicMsg2Consumer().receive(); } }
相关文章推荐
- JMS Apache ActiveMQ使用(3)
- Camel ActiveMQ topic route with jms selector
- active-mq的使用
- spring整合activemq发送MQ消息[Topic模式]实例,activemqmq
- 转: 使用Jmeter创建ActiveMQ JMS POINT TO POINT请求,环境搭建、请求创建、插件安装、监听服务器资源等
- ActiveMQspring结合使用
- activemq ObjectMessage 不鼓励使用,有安全隐患
- spring+activemq 发送10W消息报端口被占用的异常分析以及topic持久化订阅
- Apache ActiveMQ消息中间件的基本使用
- ActiveMQ VirtualTopic
- c# activeMQ使用
- 使用spring + ActiveMQ 总结
- MQ系列3 使用Spring发送,消费topic和queue消息 activeMQ
- 使用spring + ActiveMQ 总结
- JMS 服务器ActiveMQ Queue和Topic区别
- Spring+JMS+使用JMS+ActiveMQ中间件+实现简单聊天功能的小例子
- 使用ActiveMQ+MQTT实现Android点对点消息通知
- 使用Jmeter创建ActiveMQ JMS POINT TO POINT请求,环境搭建、请求创建、插件安装、监听服务器资源等
- Java ActiveMQ 讲解(一)理解JMS 和 ActiveMQ基本使用(转)
- JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息