您的位置:首页 > 其它

JMS-ActiveMQ学习笔记

2017-01-04 14:50 218 查看
前言

最近由于公司项目需求,需要实现异步消息以及后台推送功能,选择了几个方案 决定用activemq来实现该功能(ps:这里提一下,其实可以使用阿里云提供的消息服务,可以很方便的就在自己的项目中整合消息功能)。

这次项目也是我第一接触activemq,作为apahe公司提供的消息中间件,网上的资料还是很多,期间也遇到了许多问题,通过goole也一步步都解决了,在这里就mark下。

使用activemq需要在服务器上安装activemq服务 .下面先说一下,配置过程吧:

安装activemq服务

添加moven依赖

配置编写生产者和消费者

安装ActiveMQ服务

这一步很简单,直接下载安装包,根据操作系统位数,选择win64或win32包下的activemq.bat点击启动就可以了,也可以将activemq安装为系统服务,设置成开机启动,activemq的安装包里面也提供了安装服务的脚本,注意要使用管理员权限运行,不然会提示wrapper | OpenSCManager failed - 拒绝访问。 (0x5)。

添加moven依赖

activemq的安装包已经提供了所需的所有jar包,在根目录下存在一个activemq-all.jar包,只需将该jar包添加到pom.xml文件中即可,这里有一点需要注意,如果选择的jar是5.12.0还是5.9.0之后的版本之后的(具体我也忘了,导入的时候注意下就好了) 需要将包里面spring开头的文件删除再添加,不然会与项目中的spring版本冲突。

<dependency>
<groupId>com.apache.activemq</groupId>
<artifactId>activemq-all-nospring</artifactId>
<version>5.13.3</version>
</dependency>

<dependency>
<groupId>com.apache.activemq</groupId>
<artifactId>activemq-web</artifactId>
<version>5.13.3</version>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-continuation</artifactId>
<version>9.3.8.v20160314</version>
</dependency>


jetty-continuation这个jar是jety提供的异步连接依赖,这里也需要导入

activemq-web 这个是开启ajaxServler支持需要导入的jar包

配置生产者和消费者

<!-- 创建一个真正的基于 jsm提供者的联接工厂 -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
<property name="userName" value="admin"/>
<property name="password" value="admin"/>
<property name="trustAllPackages" value="true"/>
<property name="useAsyncSend" value="true"/>
</bean>

<!-- 连接工厂 -->
<bean id="cacheConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="connectionFactory"/>
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100"/>
</bean>

<!-- 创建spring连接工厂 -->
<bean id="singleConnectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="cacheConnectionFactory"/>
</bean>


SingleConnectionFactory它将在所有的createConnection()调用中返回同一个相同的共享连接对象, 并且忽略Connection.close()和stop()的调用。根据JMS连接模型,这是完全线程安全的(相反,如JDBC)。这个共享连接能够在出现异常时自动恢复创建一个新的共享连接。可以通过SingleConnectionFactory的构造函数中传入Connection对象或者 ConnectionFactory对象,用来创建被代理的连接对象。

<!-- Spring JmsTemplate 的消息生产者 start-->

<!-- 定义JmsTemplate的Queue类型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="singleConnectionFactory"/>
<!-- 非pub/sub模型(发布/订阅),即队列模式 -->
<property name="pubSubDomain" value="false"/>
<property name="receiveTimeout" value="10000"/>
</bean>

<!-- 定义JmsTemplate的Topic类型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<constructor-arg ref="topicsingleConnectionFactory"/>
<!-- 进行持久化 -->
<property name="deliveryMode" value="2"/>
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true"/>
<property name="receiveTimeout" value="10000"/>
</bean>

<!--Spring JmsTemplate 的消息生产者 end-->

<!-- 消息消费者 start-->

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default"
connection-factory="singleConnectionFactory"
acknowledge="auto">
<jms:listener destination="myqueue" ref="queueReceiver1"/>
<!--adminrecord-->
<jms:listener destination="myqueue2" ref="queueReceiverRecord1"/>
<jms:listener destination="myqueue2" ref="queueReceiverRecord2"/>
<!--dope-->
<jms:listener destination="myqueue3" ref="queueReceiverDope1"/>
<jms:listener destination="myqueue3" ref="queueReceiverDope2"/>

</jms:listener-container>

<!--定义topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default"
connection-factory="topicsingleConnectionFactory"
acknowledge="auto">
<jms:listener destination="mytopic2" ref="topicReceiver1"/>
</jms:listener-container>
<!-- 消息消费者 end-->


下面是生产者对象,用于发送消息

package com.leovito.oplus.dilectric.amq.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.*;
import java.util.HashMap;

/**
* Created by oplsu on 2017/1/3.
*/
public class QueueSender {

@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsTemplate;//通过@Qualifier修饰符来注入对应的bean

/**
* 发送一条text消息到指定的队列(目标)
*
* @param queueName 队列名称
* @param message   text消息内容
*/
public void send(String queueName, final String message) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
Message msg = session.createTextMessage(message);
msg.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
return msg;
}
});
}

/**
* 发送一条map消息到指定的队列(目标)
*
* @param queueName 队列名称
* @param hm        map消息内容
*/
public void sendMap(String queueName, HashMap hm) {
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMsg = session.createMapMessage();
mapMsg.setString("name", String.valueOf(hm.get("name")));
return mapMsg;
}
});
}
}


下面是消费者对象,用于监听activemq消息,重写onMessage方法,接受指定队列的消息

package com.leovito.oplus.dilectric.amq.consumer.queue;

import com.leovito.oplus.dilectric.Domain.Adminrecord;
import com.leovito.oplus.dilectric.Domain.Dope;
import com.leovito.oplus.dilectric.Service.AdminService;
import com.leovito.oplus.dilectric.Service.DopeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.jms.*;

/**
* @author liang
* @description 队列消息监听器
*/
@Component
public class QueueReceiverDope1 implements MessageListener {
@Autowired
private DopeService dopeService;

@Override
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
Dope dope = (Dope) objMsg.getObject();
dopeService.add(dope);
} catch (JMSException e) {
e.printStackTrace();
}
}
if (message instanceof TextMessage) {
System.out.println("dopereceiver1接收到消息:" + ((TextMessage) message).getText());
}
if (message instanceof MapMessage) {
MapMessage map = (MapMessage) message;
System.out.println("信息:" + map.getString("msg"));
}
} catch (JMSException e) {
e.printStackTrace();
}
}

}


通过以上代码即可简单实现消息发送和异步接收功能。activemq也提供了消息持久化功能,queue消息默认是开启持久化,并以文件形式存储,如果需要更好的管理消息,可以将消息持久化到数据库,activemq也提供了activemq-db.xml示例文件。只需做简单的更改即可实现
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息