您的位置:首页 > 编程语言 > Java开发

ActiveMQ 与spring整合

2017-03-17 15:54 253 查看
这两个包很重要

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.1.0.RELEASE</version>
</dependency>


配置文件整合

<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd "
>

<!-- JMS连接工厂 -->
<bean id="myConsumerFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://${mq.ip})"/>
<property name="useAsyncSend" value="true"/>
<!-- 客户端唯一标识 -->
<property name="clientID" value="${mq.env}.lyt.msg.center"/>
<!-- ObjectMessage信任所有对象 -->
<property name="trustAllPackages" value="true"/>
</bean>

<!-- 定义主题 -->
<bean id="sitLogTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 主题名字 -->
<constructor-arg value="${mq.env}.lyt.sitlog"/>
</bean>

<!-- 消息消费监听者 -->
<bean id="logListener" class="com.lyt.msg.mq.LogListener"/>

<!-- 订阅客户端 -->
<bean id="logListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="myConsumerFactory"/>
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="sitLogTopic"/>
<!-- 持久订阅 -->
<property name="subscriptionDurable" value="true"/>
<!---接收客户端ID,在持久化时,客户端不在线时消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="${mq.env}.lyt.log.consumer"/>
<property name="messageListener" ref="logListener"/>
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE=1  消息自动签收
Session.CLIENT_ACKNOWLEDGE=2  客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE=3  不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMod
4000
e" value="1"/>
</bean>

<bean id="smsTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${mq.env}.lyt.sms"/>
</bean>
<bean id="smsListener" class="com.lyt.msg.mq.SmsListener"/>
<bean id="smsListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="myConsumerFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="smsTopic"/>
<property name="subscriptionDurable" value="true"/>
<property name="clientId" value="${mq.env}.lyt.sms.consumer"/>
<property name="messageListener" ref="smsListener"/>
<property name="sessionAcknowledgeMode" value="1"/>
</bean>

<bean id="sitUserTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${mq.env}.lyt.sit.user"/>
</bean>
<bean id="sitUserListener" class="com.lyt.msg.mq.SitUserListener"/>
<bean id="sitUserListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="myConsumerFactory"/>
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="sitUserTopic"/>
<property name="subscriptionDurable" value="true"/>
<property name="clientId" value="${mq.env}.lyt.sit.user.consumer"/>
<property name="messageListener" ref="sitUserListener"/>
<property name="sessionAcknowledgeMode" value="1"/>
</bean>

</beans>


建立消息处理对象用来消费消息

package com.lyt.msg.mq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.lyt.base.vo.MQSitUser;
import com.lyt.msg.biz.BizSitUser;

@Component
public class SitUserListener implements MessageListener {
private Log log = LogFactory.getLog(SitUserListener.class);

@Autowired
private BizSitUser bizSitUser;

@Override
public void onMessage(Message message) {
MQSitUser mqSitUser = null;
ObjectMessage objectMessage = (ObjectMessage)message;
try {
mqSitUser = (MQSitUser) objectMessage.getObject();
} catch (JMSException e) {
log.error(e.getMessage());
return;
}
if(mqSitUser.getActionType() == MQSitUser.ACTION_THIRD_USER) {
bizSitUser.addUserByThird(mqSitUser);
} else if(mqSitUser.getActionType() == MQSitUser.ACTION_BIND_USER) {
bizSitUser.bindThird(mqSitUser);
} else if(mqSitUser.getActionType() == MQSitUser.ACTION_UPD_USER) {
bizSitUser.updUser(mqSitUser);
} else if(mqSitUser.getActionType() == MQSitUser.ACTION_TRESPASS_USER) {
bizSitUser.addUserByMobile(mqSitUser);
}
}
}


创建消息发送工具类

package com.lyt.base.util;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.lyt.base.vo.MQSitLog;
import com.lyt.base.vo.MQSitUser;
import com.lyt.base.vo.MQSmsInfo;
public class MQUtil {
public static void sendLogMsg(JmsTemplate myJmsTemplate, Destination sitLogTopic, MQSitLog mqSitLog) {
myJmsTemplate.send(sitLogTopic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(mqSitLog);
return objectMessage;
}
});
}

public static void sendSms(JmsTemplate myJmsTemplate, Destination smsTopic, MQSmsInfo mqSmsInfo) {
myJmsTemplate.send(smsTopic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(mqSmsInfo);
return objectMessage;
}
});
}

public static void sendSitUserMsg(JmsTemplate myJmsTemplate, Destination sitUserTopic, MQSitUser mqSitUser) {
myJmsTemplate.send(sitUserTopic, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(mqSitUser);
return objectMessage;
}
});
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  activemq spring