ActiveMQ 与spring整合
2017-03-17 15:54
253 查看
这两个包很重要
配置文件整合
建立消息处理对象用来消费消息
创建消息发送工具类
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.1</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.1.0.RELEASE</version> </dependency>
配置文件整合
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd " > <!-- JMS连接工厂 --> <bean id="myConsumerFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://${mq.ip})"/> <property name="useAsyncSend" value="true"/> <!-- 客户端唯一标识 --> <property name="clientID" value="${mq.env}.lyt.msg.center"/> <!-- ObjectMessage信任所有对象 --> <property name="trustAllPackages" value="true"/> </bean> <!-- 定义主题 --> <bean id="sitLogTopic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 主题名字 --> <constructor-arg value="${mq.env}.lyt.sitlog"/> </bean> <!-- 消息消费监听者 --> <bean id="logListener" class="com.lyt.msg.mq.LogListener"/> <!-- 订阅客户端 --> <bean id="logListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConsumerFactory"/> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true"/> <property name="destination" ref="sitLogTopic"/> <!-- 持久订阅 --> <property name="subscriptionDurable" value="true"/> <!---接收客户端ID,在持久化时,客户端不在线时消息就存在数据库里,直到被这个ID的客户端消费掉--> <property name="clientId" value="${mq.env}.lyt.log.consumer"/> <property name="messageListener" ref="logListener"/> <!-- 消息应答方式 Session.AUTO_ACKNOWLEDGE=1 消息自动签收 Session.CLIENT_ACKNOWLEDGE=2 客户端调用acknowledge方法手动签收 Session.DUPS_OK_ACKNOWLEDGE=3 不必必须签收,消息可能会重复发送 --> <property name="sessionAcknowledgeMod 4000 e" value="1"/> </bean> <bean id="smsTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="${mq.env}.lyt.sms"/> </bean> <bean id="smsListener" class="com.lyt.msg.mq.SmsListener"/> <bean id="smsListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConsumerFactory"/> <property name="pubSubDomain" value="true"/> <property name="destination" ref="smsTopic"/> <property name="subscriptionDurable" value="true"/> <property name="clientId" value="${mq.env}.lyt.sms.consumer"/> <property name="messageListener" ref="smsListener"/> <property name="sessionAcknowledgeMode" value="1"/> </bean> <bean id="sitUserTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="${mq.env}.lyt.sit.user"/> </bean> <bean id="sitUserListener" class="com.lyt.msg.mq.SitUserListener"/> <bean id="sitUserListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConsumerFactory"/> <property name="pubSubDomain" value="true"/> <property name="destination" ref="sitUserTopic"/> <property name="subscriptionDurable" value="true"/> <property name="clientId" value="${mq.env}.lyt.sit.user.consumer"/> <property name="messageListener" ref="sitUserListener"/> <property name="sessionAcknowledgeMode" value="1"/> </bean> </beans>
建立消息处理对象用来消费消息
package com.lyt.msg.mq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.lyt.base.vo.MQSitUser; import com.lyt.msg.biz.BizSitUser; @Component public class SitUserListener implements MessageListener { private Log log = LogFactory.getLog(SitUserListener.class); @Autowired private BizSitUser bizSitUser; @Override public void onMessage(Message message) { MQSitUser mqSitUser = null; ObjectMessage objectMessage = (ObjectMessage)message; try { mqSitUser = (MQSitUser) objectMessage.getObject(); } catch (JMSException e) { log.error(e.getMessage()); return; } if(mqSitUser.getActionType() == MQSitUser.ACTION_THIRD_USER) { bizSitUser.addUserByThird(mqSitUser); } else if(mqSitUser.getActionType() == MQSitUser.ACTION_BIND_USER) { bizSitUser.bindThird(mqSitUser); } else if(mqSitUser.getActionType() == MQSitUser.ACTION_UPD_USER) { bizSitUser.updUser(mqSitUser); } else if(mqSitUser.getActionType() == MQSitUser.ACTION_TRESPASS_USER) { bizSitUser.addUserByMobile(mqSitUser); } } }
创建消息发送工具类
package com.lyt.base.util; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import com.lyt.base.vo.MQSitLog; import com.lyt.base.vo.MQSitUser; import com.lyt.base.vo.MQSmsInfo; public class MQUtil { public static void sendLogMsg(JmsTemplate myJmsTemplate, Destination sitLogTopic, MQSitLog mqSitLog) { myJmsTemplate.send(sitLogTopic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(mqSitLog); return objectMessage; } }); } public static void sendSms(JmsTemplate myJmsTemplate, Destination smsTopic, MQSmsInfo mqSmsInfo) { myJmsTemplate.send(smsTopic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(mqSmsInfo); return objectMessage; } }); } public static void sendSitUserMsg(JmsTemplate myJmsTemplate, Destination sitUserTopic, MQSitUser mqSitUser) { myJmsTemplate.send(sitUserTopic, new MessageCreator() { public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(mqSitUser); return objectMessage; } }); } }
相关文章推荐
- 4、深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合ActiveMQ
- ActiveMQ入门教程(六) - ActiveMQ与Spring整合-MessageListener
- spring整合activemq发送消息[queue类型]实例
- springboot整合activeMQ
- Spring和ActiveMQ整合
- Spring整合JMS(一)——基于ActiveMQ实现
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS——基于ActiveMQ实现
- Spring整合JMS(一)——基于ActiveMQ实现 (转)
- spring整合activemq步骤
- JMS【四】--Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(4)--Spring和ActiveMQ整合的完整实例
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合ActiveMQ
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- spring整合JMS - 基于ActiveMQ实现