【J2EE】企业级项目开发总结--JMS发布-订阅模型篇
2015-09-27 15:57
525 查看
通过上篇博客的示例,我们可以总结出消息通信的基本过程。首先得到连接工厂,通过工厂生产连接,从连接中得到一个会话,根据会话创建消息生产者,目的地,消息和消息消费者。详细过程如下图。
和前一篇P2P中的代码类似,我们今天来看看发布订阅模型。只是在这里把Queue换成了Topic,消息生产者叫做消息发布者,发布一个主题,所有订阅主题的类叫做消息订阅者,每个订阅者都对应一个监听类,当消息发送到主题队列后,会调用监听类的onMessage方法,从而使得各订阅者收到消息,所有的订阅者都能收到同样的消息,消息被消费了不只一次。
代码如下:
TopicPublisher
TopicSubscriber
TopicListener
TopicSubscriber2和TopicListener2和上面的一样,改一下名称和打印信息即可.
运行代码,得到如下结果:
关于P2P模型和Pub/Sub模型的特点,我们好好体会体会吧.
和前一篇P2P中的代码类似,我们今天来看看发布订阅模型。只是在这里把Queue换成了Topic,消息生产者叫做消息发布者,发布一个主题,所有订阅主题的类叫做消息订阅者,每个订阅者都对应一个监听类,当消息发送到主题队列后,会调用监听类的onMessage方法,从而使得各订阅者收到消息,所有的订阅者都能收到同样的消息,消息被消费了不只一次。
代码如下:
TopicPublisher
package com.tgb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息发布者 * @author ghy * */ public class TopicPublisher { private static final String USERNAME=ActiveMQConnectionFactory.DEFAULT_USER;//默认的连接用户名 private static final String PASSWORD=ActiveMQConnectionFactory.DEFAULT_PASSWORD;//默认的连接密码 private static final String BROKEURL=ActiveMQConnectionFactory.DEFAULT_BROKER_URL;//默认的连接地址 private static final int SENDNUM=10;//发送的消息数量 public static void main(String[] args){ ConnectionFactory connectionFactory;//连接工厂 Connection connection=null; //连接 Session session; //会话,接受或者发送消息的线程 MessageProducer messageProducer; //消息发送者 //实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(TopicPublisher.USERNAME,TopicPublisher.PASSWORD,TopicPublisher.BROKEURL); try { connection=connectionFactory.createConnection();//创建连接 connection.start();//启动连接 session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建连接 Topic topic =session.createTopic("FirstTopic");//创建主题 messageProducer=session.createProducer(topic);//创建消息生产者 sendMessage(session,messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } //发送消息 public static void sendMessage(Session session,MessageProducer messageProducer){ for (int i=0;i<TopicPublisher.SENDNUM;i++){ TextMessage message; try { message = session.createTextMessage("ActiveMQ 发送的消息" + i); System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i); messageProducer.send(message); } catch (JMSException e) { e.printStackTrace(); } } } }
TopicSubscriber
package com.tgb.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息订阅者1 * @author ghy * */ public class TopicSubscriber { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER;//默认连接的用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认链接地址 public static void main(String[] args){ ConnectionFactory connectionFactory;//连接工厂 Connection connection=null;//连接 Session session;//会话 MessageConsumer messageConsumer;//消息消费者 connectionFactory=new ActiveMQConnectionFactory(TopicSubscriber.USERNAME,TopicSubscriber.PASSWORD,TopicSubscriber.BROKEURL); try { connection=connectionFactory.createConnection();//创建连接 connection.start();//启动连接 session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建session Topic topic=session.createTopic("FirstTopic");//创建主题 messageConsumer=session.createConsumer(topic);//创建消息消费者 //注册消息监听 TopicListener listener=new TopicListener(); messageConsumer.setMessageListener(listener); } catch (JMSException e) { e.printStackTrace(); } } }
TopicListener
package com.tgb.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 监听器 * @author ghy * */ public class TopicListener implements MessageListener { public void onMessage(Message message) { try { System.out.println("订阅者1收到的消息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
TopicSubscriber2和TopicListener2和上面的一样,改一下名称和打印信息即可.
运行代码,得到如下结果:
关于P2P模型和Pub/Sub模型的特点,我们好好体会体会吧.
相关文章推荐
- android四种更新UI的方法
- 搜狗面试题
- JAVA静态加载出错
- HTML链接
- 测试的基本知识点
- afn 上传两张图片
- CentOS 7 中 pptpd安装
- hadoop 第一个程序wordcount执行过程
- [LeetCode][JavaScript]Spiral Matrix
- Linux下locale: Cannot set LC_CTYPE to default locale: No such file or directory警告
- Scala深入浅出进阶经典第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析
- Java多线程基础知识(五)
- 多按键判断程序
- 中国距离危机还远
- Win10任务栏怎么添加图标? Win10任务栏添加显示桌面的图标的教程
- 【C语言】获取一个数二进制序列中所有的偶数位和奇数位,分别输出二进制序列
- 【物联网】WiFi基础知识
- js动态控制多选框的选中项
- linux下执行java为什么bash:javac:command not found
- Java中的substring真的会引起内存泄露么?