JMS-OpenJMS提供商
2016-03-18 23:07
239 查看
JMS是一个规范,实现 JSM 客户端发送和接收消息,必须有一个实现了 JMS 的消息系统(即 JMS 提供商),我们在编写 JMS客户端时,使用 **JNDI API** 查找已配置的 JMS 对象(ConnectionFactory、Destination被管理对象),JMS供应商提供专有的工具来创建和管理它们。目前用的最多的JMS主流开源项目是OpenJMS和ActiveMQ。
本文选用 OpenJMS 开源JMS服务器,OpenJMS支持SUN的JMS API 1.1规范,支持消息队列,支持消息传递的发布/订阅模式,主要功能如下:
1. 支持点对点、发布订阅消息模型
2. 可靠的消息传输
3. 支持同步、异步消息传输
4. 使用JDBC的持久化
5. 本地事物
6. 身份认证
7. 管理界面
8. 基于XML的配置文件
9. 内存和数据库垃圾回收
10. 支持RMI、TCP、HTTP和SSL协议栈
11. 集成Servlet服务器,例如Jakarta Tomcat
12. 支持大量终端和用户
官方地址是http://openjms.sourceforge.net/
环境搭建:
下载openjms-0.7.7-beta-1.zip和openjms-0.7.7-beta-1-src.zip源码两个包,直接解压到指定目录,目录说明
|—-openjms-0.7.7-beta-1
| |—-bin ——包含Linux、Windows脚本,启动、管理、停止OpenJMS服务,创建OpenJMS数据库
| |—-config ——相关配置文件,主要是openjms.xml 其中配置的有相关的ConnectionFactory、Queue、Topic等
| |—-db ——数据库表创建脚本
| |—-docs ——OpenJMS文档说明
| |—-examples ——OpenJMS文档说明
| |—-lib ——服务器和客户端所依赖的 jar 包
另注:OpenJMS默认使用的是derby数据库
Windows下直接启动bin目录中的startup.bat
OpenJMS服务就已经起来了,下面开始编写JMS客户端。
通过目录中的admin.bat可以查看JMS服务器的相关信息,比如有哪些Queue、Topic以及用户,默认的如下(使用Actions-Connections-online):
首先用 MyEclipse创建一个maven工程,然后添加依赖的jar包
说明:上面部分jar包是自己安装到了本地仓库中。。。
首先来一个点对点的基于Queue的例子:
往 queue1 队列发送5 条消息,然后消费掉
基于主题Topic的实例: 消息的发布和订阅,一对一或一对多,有两个概念:非持久性订阅和持久性订阅
非持久性订阅:只有当客户端处于激活状态时,才能接收到消息,处于离线状态,那么这个时间段的发送到Topic的消息就永远接收不到了
持久性订阅:客户端向JMS注册一个唯一标识,当客户端离线时,JMS是保存所有发送到这个唯一标识的消息,客户端再次激活时,就能接收到离线时发送的消息
两种传输方式的比较:
1. 有无状态:Topic 数据默认不落地,无状态;Queue 数据会在服务器保存
2. 完整性:Topic 不保证发布的每条数据都能接收到,可能会丢失(除持久订阅);Queue保证每条数据都能被接收到,不会丢失
3. 发送接收策略:Topic一对多,每个订阅者都能接收到消息;Queue一对一,只会有一个接收者成功接收
理解的不深入,持续优化中... ...
本文选用 OpenJMS 开源JMS服务器,OpenJMS支持SUN的JMS API 1.1规范,支持消息队列,支持消息传递的发布/订阅模式,主要功能如下:
1. 支持点对点、发布订阅消息模型
2. 可靠的消息传输
3. 支持同步、异步消息传输
4. 使用JDBC的持久化
5. 本地事物
6. 身份认证
7. 管理界面
8. 基于XML的配置文件
9. 内存和数据库垃圾回收
10. 支持RMI、TCP、HTTP和SSL协议栈
11. 集成Servlet服务器,例如Jakarta Tomcat
12. 支持大量终端和用户
官方地址是http://openjms.sourceforge.net/
环境搭建:
下载openjms-0.7.7-beta-1.zip和openjms-0.7.7-beta-1-src.zip源码两个包,直接解压到指定目录,目录说明
|—-openjms-0.7.7-beta-1
| |—-bin ——包含Linux、Windows脚本,启动、管理、停止OpenJMS服务,创建OpenJMS数据库
| |—-config ——相关配置文件,主要是openjms.xml 其中配置的有相关的ConnectionFactory、Queue、Topic等
| |—-db ——数据库表创建脚本
| |—-docs ——OpenJMS文档说明
| |—-examples ——OpenJMS文档说明
| |—-lib ——服务器和客户端所依赖的 jar 包
另注:OpenJMS默认使用的是derby数据库
Windows下直接启动bin目录中的startup.bat
OpenJMS服务就已经起来了,下面开始编写JMS客户端。
通过目录中的admin.bat可以查看JMS服务器的相关信息,比如有哪些Queue、Topic以及用户,默认的如下(使用Actions-Connections-online):
首先用 MyEclipse创建一个maven工程,然后添加依赖的jar包
<!-- JMS 所需Jar包开始 --> <dependency> <groupid>javax</groupid> <artifactid>jms</artifactid> <version>1.1</version> </dependency> <dependency> <groupid>openjms</groupid> <artifactid>openjms</artifactid> <version>0.7.7</version> </dependency> <dependency> <groupid>openjms</groupid> <artifactid>openjms-common</artifactid> <version>0.7.7</version> </dependency> <dependency> <groupid>openjms</groupid> <artifactid>openjms-net</artifactid> <version>0.7.7</version> </dependency> <dependency> <groupid>concurrent</groupid> <artifactid>concurrent</artifactid> <version>1.3.4</version> </dependency> <dependency> <groupid>spice</groupid> <artifactid>spice-jndikit</artifactid> <version>1.2</version> </dependency> <dependency> <groupid>exolabcore</groupid> <artifactid>exolabcore</artifactid> <version>0.3.7</version> </dependency> <dependency> <groupid>commons-logging</groupid> <artifactid>commons-logging</artifactid> <version>1.2</version> </dependency> <!-- JMS 所需Jar包结束 -->
说明:上面部分jar包是自己安装到了本地仓库中。。。
首先来一个点对点的基于Queue的例子:
/** * Jms工具类,用来获取Connection、Destination 和关闭连接、会话、上下文 * @author Administrator */ public class JmsUtil { private static Context context; private static final String TCP_QUEUE = "ConnectionFactory"; static{ /** * 要想使用 JNDI 首先得需要一个 Context 上下文 * 使用相应的参数来初始化一个Context */ Hashtable<String, String> properties = new Hashtable<String, String>(); // 与JDBC 驱动 很相似 properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.exolab.jms.jndi.InitialContextFactory"); // JMS 服务器地址 properties.put(Context.PROVIDER_URL, "tcp://localhost:3035/"); try { context = new InitialContext(properties); } catch (Exception e) { e.printStackTrace(); } } /** * 通过connectionFactoryName 获取一个连接到JMS服务器的Connection */ public static Connection getConnectionFactory() throws Exception{ ConnectionFactory cf = (ConnectionFactory) context.lookup(TCP_QUEUE); return cf.createConnection(); } /** * 返回一个消息发送的地方 * @param queueOrTopicName 队列或主题的通道名 */ public static Destination getDestination(String queueOrTopicName) throws Exception{ return (Destination) context.lookup(queueOrTopicName); } public static void close(Connection connection, Session session) throws Exception{ session.close(); connection.close(); context.close(); } }
往 queue1 队列发送5 条消息,然后消费掉
/** * JMS客户端既可以发送消息也可以接收消息 * @author Administrator */ public class QueueJmsClient { public static void main(String[] args){ try{ // 在openjms.xml 配置文件中有一个名为queue1队列 Connection connection = JmsUtil.getConnectionFactory(); Destination destination = JmsUtil.getDestination("queue1"); connection.start(); // 创建 Session会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 会话不使用事务,AUTO_ACKNOWLEDGE设定自动应答 // 创建一个消息消费者 MessageConsumer queueConsumer = session.createConsumer(destination); // 异步接收,每次数限制,只要有没消费的消息都会在回调函数监听器里执行 // 成功接收消息后,消息就直接被删除了,也就是说,只能有一个接收者接收 queueConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { System.out.println("queueConsumer异步消费消息>>"+((TextMessage)message).getText()+"<<"); } catch (JMSException e) { e.printStackTrace(); } } }); // 创建一个消息生产者,发送 5 条文本消息到queue1 队列中 MessageProducer queueSender = session.createProducer(destination); TextMessage message = session.createTextMessage(); for(int i=0; i<5; i++){ message.setText("Hello, this is Queue JMSmsg===" + i); queueSender.send(message); System.out.println("queueSender已发送消息>>"+message.getText()+"<<"); } // 同步接收,只能接收一次 // TextMessage consumeMsg = (TextMessage) queueConsumer.receive(); // System.out.println("queueConsumer同步消费消息>>"+consumeMsg.getText()+"<<"); Thread.sleep(4000);// 人为阻塞程序,防止没有接收完,程序就结束了 JmsUtil.close(connection, session); } catch(Exception e){ e.printStackTrace(); } } } 输出结果: queueSender已发送消息>>Hello, this is Queue JMSmsg===0<< queueConsumer异步消费消息>>Hello, this is Queue JMSmsg===0<< queueSender已发送消息>>Hello, this is Queue JMSmsg===1<< queueSender已发送消息>>Hello, this is Queue JMSmsg===2<< queueConsumer异步消费消息>>Hello, this is Queue JMSmsg===1<< queueSender已发送消息>>Hello, this is Queue JMSmsg===3<< queueSender已发送消息>>Hello, this is Queue JMSmsg===4<< queueConsumer异步消费消息>>Hello, this is Queue JMSmsg===2<< queueConsumer异步消费消息>>Hello, this is Queue JMSmsg===3<< queueConsumer异步消费消息>>Hello, this is Queue JMSmsg===4<<
基于主题Topic的实例: 消息的发布和订阅,一对一或一对多,有两个概念:非持久性订阅和持久性订阅
非持久性订阅:只有当客户端处于激活状态时,才能接收到消息,处于离线状态,那么这个时间段的发送到Topic的消息就永远接收不到了
持久性订阅:客户端向JMS注册一个唯一标识,当客户端离线时,JMS是保存所有发送到这个唯一标识的消息,客户端再次激活时,就能接收到离线时发送的消息
/** * 非持久化订阅,发布和订阅接收消息 * @author Administrator */ public class TopicJmsClient { public static void main(String[] args) { try { Connection connection = JmsUtil.getConnectionFactory(); Destination destination = JmsUtil.getDestination("topic1"); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 会话不使用事务,AUTO_ACKNOWLEDGE设定自动应答 // 非持久订阅,首先得保证 订阅者 处于激活状态,这里把订阅者 放在 发布者 之前,使用异步订阅接收消息 MessageConsumer topicSubscriber = session.createConsumer(destination); topicSubscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { try { System.out.println("topicSubscriber已订阅接收消息>>"+((TextMessage) msg).getText()+"<<"); } catch (JMSException e) { e.printStackTrace(); } } }); // 发布者发布消息到 Topic1 主题,其中的订阅者 都能收到消息 // 首先订阅者客户端需处于激活状态,如果在发布的时候客户端离线,那么他就永远收不到这次发送的消息 // 订阅者接收订阅的消息后,消息在主题中依旧存在 MessageProducer topicPublisher = session.createProducer(destination); TextMessage message = session.createTextMessage(); for(int i=0; i<5; i++){ message.setText("Hello, this is Topic message ===" + i); topicPublisher.send(message); System.out.println("topicProducer已发布消息>>"+message.getText()+"<<"); } Thread.sleep(4000); JmsUtil.close(connection, session); } catch (Exception e) { e.printStackTrace(); } } } 输出结果: topicSubscriber已订阅接收消息>>Hello, this is Topic message ===0<< topicProducer已发布消息>>Hello, this is Topic message ===0<< topicSubscriber已订阅接收消息>>Hello, this is Topic message ===1<< topicProducer已发布消息>>Hello, this is Topic message ===1<< topicSubscriber已订阅接收消息>>Hello, this is Topic message ===2<< topicProducer已发布消息>>Hello, this is Topic message ===2<< topicSubscriber已订阅接收消息>>Hello, this is Topic message ===3<< topicProducer已发布消息>>Hello, this is Topic message ===3<< topicSubscriber已订阅接收消息>>Hello, this is Topic message ===4<< topicProducer已发布消息>>Hello, this is Topic message ===4<<
/** * 持久订阅者,在第一次运行时,JMS服务器会记录它的身份信息,关键身份信息就是 客户端唯一表示ID 和订阅者姓名 * 持久性订阅者会把存于与Topic 中的所有消息都接收,然后删除 * @author Administrator */ public class DurableSubscriber { public static void main(String[] args) { try { Connection connection = JmsUtil.getConnectionFactory(); Destination destination = JmsUtil.getDestination("topic1"); connection.setClientID("client1"); //客户端唯一标识 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // AUTO_ACKNOWLEDGE设定自动应答 // 创建持久订阅者,第一个参数是Topic通道,第二个是订阅者 MessageConsumer topicDurableSubscriber = session.createDurableSubscriber((Topic) destination, "sub2"); topicDurableSubscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { try { System.out.println("topicSubscriber已订阅接收消息>>"+((TextMessage) msg).getText()+"<<"); } catch (JMSException e) { e.printStackTrace(); } } }); Thread.sleep(5000); JmsUtil.close(connection, session); } catch (Exception e) { e.printStackTrace(); } } } 输出结果: topicSubscriber已订阅接收消息>>Hello, this is Topic message 离线时发送 ===0<< topicSubscriber已订阅接收消息>>Hello, this is Topic message 离线时发送 ===1<< topicSubscriber已订阅接收消息>>Hello, this is Topic message 离线时发送 ===2<< topicSubscriber已订阅接收消息>>Hello, this is Topic message 离线时发送 ===3<< topicSubscriber已订阅接收消息>>Hello, this is Topic message 离线时发送 ===4<<
两种传输方式的比较:
1. 有无状态:Topic 数据默认不落地,无状态;Queue 数据会在服务器保存
2. 完整性:Topic 不保证发布的每条数据都能接收到,可能会丢失(除持久订阅);Queue保证每条数据都能被接收到,不会丢失
3. 发送接收策略:Topic一对多,每个订阅者都能接收到消息;Queue一对一,只会有一个接收者成功接收
理解的不深入,持续优化中... ...
相关文章推荐
- 使用openpyxl修改Excel文件,日期累加写入
- 【小白学OpenCV】(1): 图像处理之低通滤波
- error: option --single-version-externally-managed not recognized
- 学习OpenCV2——Mat之通道的理解
- NanoPC-T2 uboot分析(1)
- 关于openStream方法和openConnection方法的区别
- Linux内核分析学习笔记(一)
- Linux内核设计第四周——扒开系统调用三层皮
- STM32canopen调试
- linux下归档
- Linux下管理用户的命令大全
- nginx
- linux系统下重启tomcat的shell脚本
- Linux编程-让进程或线程运行在指定的CPU上
- Openlayers3加载天地图
- Openlayers3加载天地图
- couldn't connect to server 127.0.0.1 shell/mongo.js:79
- openwrt 显示系统版本和GCC的版本
- Linux系统程序包管理工具 RPM
- Linux Shell 编程语法