JMS-ActiveMQ学习笔记
2017-01-04 14:50
218 查看
前言
最近由于公司项目需求,需要实现异步消息以及后台推送功能,选择了几个方案 决定用activemq来实现该功能(ps:这里提一下,其实可以使用阿里云提供的消息服务,可以很方便的就在自己的项目中整合消息功能)。
这次项目也是我第一接触activemq,作为apahe公司提供的消息中间件,网上的资料还是很多,期间也遇到了许多问题,通过goole也一步步都解决了,在这里就mark下。
使用activemq需要在服务器上安装activemq服务 .下面先说一下,配置过程吧:
安装activemq服务
添加moven依赖
配置编写生产者和消费者
安装ActiveMQ服务
这一步很简单,直接下载安装包,根据操作系统位数,选择win64或win32包下的activemq.bat点击启动就可以了,也可以将activemq安装为系统服务,设置成开机启动,activemq的安装包里面也提供了安装服务的脚本,注意要使用管理员权限运行,不然会提示wrapper | OpenSCManager failed - 拒绝访问。 (0x5)。
添加moven依赖
activemq的安装包已经提供了所需的所有jar包,在根目录下存在一个activemq-all.jar包,只需将该jar包添加到pom.xml文件中即可,这里有一点需要注意,如果选择的jar是5.12.0还是5.9.0之后的版本之后的(具体我也忘了,导入的时候注意下就好了) 需要将包里面spring开头的文件删除再添加,不然会与项目中的spring版本冲突。
jetty-continuation这个jar是jety提供的异步连接依赖,这里也需要导入
activemq-web 这个是开启ajaxServler支持需要导入的jar包
配置生产者和消费者
SingleConnectionFactory它将在所有的createConnection()调用中返回同一个相同的共享连接对象, 并且忽略Connection.close()和stop()的调用。根据JMS连接模型,这是完全线程安全的(相反,如JDBC)。这个共享连接能够在出现异常时自动恢复创建一个新的共享连接。可以通过SingleConnectionFactory的构造函数中传入Connection对象或者 ConnectionFactory对象,用来创建被代理的连接对象。
下面是生产者对象,用于发送消息
下面是消费者对象,用于监听activemq消息,重写onMessage方法,接受指定队列的消息
通过以上代码即可简单实现消息发送和异步接收功能。activemq也提供了消息持久化功能,queue消息默认是开启持久化,并以文件形式存储,如果需要更好的管理消息,可以将消息持久化到数据库,activemq也提供了activemq-db.xml示例文件。只需做简单的更改即可实现
最近由于公司项目需求,需要实现异步消息以及后台推送功能,选择了几个方案 决定用activemq来实现该功能(ps:这里提一下,其实可以使用阿里云提供的消息服务,可以很方便的就在自己的项目中整合消息功能)。
这次项目也是我第一接触activemq,作为apahe公司提供的消息中间件,网上的资料还是很多,期间也遇到了许多问题,通过goole也一步步都解决了,在这里就mark下。
使用activemq需要在服务器上安装activemq服务 .下面先说一下,配置过程吧:
安装activemq服务
添加moven依赖
配置编写生产者和消费者
安装ActiveMQ服务
这一步很简单,直接下载安装包,根据操作系统位数,选择win64或win32包下的activemq.bat点击启动就可以了,也可以将activemq安装为系统服务,设置成开机启动,activemq的安装包里面也提供了安装服务的脚本,注意要使用管理员权限运行,不然会提示wrapper | OpenSCManager failed - 拒绝访问。 (0x5)。
添加moven依赖
activemq的安装包已经提供了所需的所有jar包,在根目录下存在一个activemq-all.jar包,只需将该jar包添加到pom.xml文件中即可,这里有一点需要注意,如果选择的jar是5.12.0还是5.9.0之后的版本之后的(具体我也忘了,导入的时候注意下就好了) 需要将包里面spring开头的文件删除再添加,不然会与项目中的spring版本冲突。
<dependency> <groupId>com.apache.activemq</groupId> <artifactId>activemq-all-nospring</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>com.apache.activemq</groupId> <artifactId>activemq-web</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-continuation</artifactId> <version>9.3.8.v20160314</version> </dependency>
jetty-continuation这个jar是jety提供的异步连接依赖,这里也需要导入
activemq-web 这个是开启ajaxServler支持需要导入的jar包
配置生产者和消费者
<!-- 创建一个真正的基于 jsm提供者的联接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> <property name="userName" value="admin"/> <property name="password" value="admin"/> <property name="trustAllPackages" value="true"/> <property name="useAsyncSend" value="true"/> </bean> <!-- 连接工厂 --> <bean id="cacheConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="connectionFactory"/> <!-- Session缓存数量 --> <property name="sessionCacheSize" value="100"/> </bean> <!-- 创建spring连接工厂 --> <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="cacheConnectionFactory"/> </bean>
SingleConnectionFactory它将在所有的createConnection()调用中返回同一个相同的共享连接对象, 并且忽略Connection.close()和stop()的调用。根据JMS连接模型,这是完全线程安全的(相反,如JDBC)。这个共享连接能够在出现异常时自动恢复创建一个新的共享连接。可以通过SingleConnectionFactory的构造函数中传入Connection对象或者 ConnectionFactory对象,用来创建被代理的连接对象。
<!-- Spring JmsTemplate 的消息生产者 start--> <!-- 定义JmsTemplate的Queue类型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="singleConnectionFactory"/> <!-- 非pub/sub模型(发布/订阅),即队列模式 --> <property name="pubSubDomain" value="false"/> <property name="receiveTimeout" value="10000"/> </bean> <!-- 定义JmsTemplate的Topic类型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <constructor-arg ref="topicsingleConnectionFactory"/> <!-- 进行持久化 --> <property name="deliveryMode" value="2"/> <!-- pub/sub模型(发布/订阅) --> <property name="pubSubDomain" value="true"/> <property name="receiveTimeout" value="10000"/> </bean> <!--Spring JmsTemplate 的消息生产者 end--> <!-- 消息消费者 start--> <!-- 定义Queue监听器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="singleConnectionFactory" acknowledge="auto"> <jms:listener destination="myqueue" ref="queueReceiver1"/> <!--adminrecord--> <jms:listener destination="myqueue2" ref="queueReceiverRecord1"/> <jms:listener destination="myqueue2" ref="queueReceiverRecord2"/> <!--dope--> <jms:listener destination="myqueue3" ref="queueReceiverDope1"/> <jms:listener destination="myqueue3" ref="queueReceiverDope2"/> </jms:listener-container> <!--定义topic监听器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="topicsingleConnectionFactory" acknowledge="auto"> <jms:listener destination="mytopic2" ref="topicReceiver1"/> </jms:listener-container> <!-- 消息消费者 end-->
下面是生产者对象,用于发送消息
package com.leovito.oplus.dilectric.amq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.jms.*; import java.util.HashMap; /** * Created by oplsu on 2017/1/3. */ public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean /** * 发送一条text消息到指定的队列(目标) * * @param queueName 队列名称 * @param message text消息内容 */ public void send(String queueName, final String message) { jmsTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT); return msg; } }); } /** * 发送一条map消息到指定的队列(目标) * * @param queueName 队列名称 * @param hm map消息内容 */ public void sendMap(String queueName, HashMap hm) { jmsTemplate.send(queueName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { MapMessage mapMsg = session.createMapMessage(); mapMsg.setString("name", String.valueOf(hm.get("name"))); return mapMsg; } }); } }
下面是消费者对象,用于监听activemq消息,重写onMessage方法,接受指定队列的消息
package com.leovito.oplus.dilectric.amq.consumer.queue; import com.leovito.oplus.dilectric.Domain.Adminrecord; import com.leovito.oplus.dilectric.Domain.Dope; import com.leovito.oplus.dilectric.Service.AdminService; import com.leovito.oplus.dilectric.Service.DopeService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.jms.*; /** * @author liang * @description 队列消息监听器 */ @Component public class QueueReceiverDope1 implements MessageListener { @Autowired private DopeService dopeService; @Override public void onMessage(Message message) { try { if (message instanceof ObjectMessage) { ObjectMessage objMsg = (ObjectMessage) message; try { Dope dope = (Dope) objMsg.getObject(); dopeService.add(dope); } catch (JMSException e) { e.printStackTrace(); } } if (message instanceof TextMessage) { System.out.println("dopereceiver1接收到消息:" + ((TextMessage) message).getText()); } if (message instanceof MapMessage) { MapMessage map = (MapMessage) message; System.out.println("信息:" + map.getString("msg")); } } catch (JMSException e) { e.printStackTrace(); } } }
通过以上代码即可简单实现消息发送和异步接收功能。activemq也提供了消息持久化功能,queue消息默认是开启持久化,并以文件形式存储,如果需要更好的管理消息,可以将消息持久化到数据库,activemq也提供了activemq-db.xml示例文件。只需做简单的更改即可实现
相关文章推荐
- 了解下activeMQ,消息队列,只是简单的了解总结一下...(第一季)
- 解析ActiveMQ的使用说明总结
- ActiveMQ在C#中的应用示例分析
- ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现
- activemq报EOFExceptionjvm错误
- ActiveMQ 消息服务(一)
- ActiveMQ 消息服务(二)
- ActiveMQ 消息服务(三)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- 基于zookeeper+leveldb搭建activemq集群
- rocketmq命令行自动补全工具
- 电商平台常用缓存策略总结
- 消息队列的使用场景和使用技巧
- rabbitmq学习
- cenos下ActiveMQ关闭时出现异常
- activemq安全设置 设置admin的用户名和密码
- ActiveMQ 实例
- 一台机器上运行多个ActiveMq
- activemq安全设置 设置admin的用户名和密码
- ActiveMQ