Spring整合activeMQ
2017-09-18 19:36
337 查看
最近项目上一个应用就是处理完业务逻辑后要发短信通知客户是否处理成功,如果把这个发短信的业务也放到一起处理,可能会导致延迟等问题,所以采用异步处理的方式,把发短信的业务逻辑扔到activeMQ消息中间件中处理。
消息生产者Service:
package com.booth.common.service; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ScheduledMessage; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Service public class MsgQueueSenderService { private final static Logger logger = LoggerFactory.getLogger(MsgQueueSenderService.class); private JmsTemplate jmsTemplate; private Destination appAuthDestination; @Value("${appAuthQueue.retry}") private boolean IS_RETRY; @Value("${appAuthQueue.timeout.max}") private Long TIMEOUT_MAX; @Value("${appAuthQueue.timeout.step}") private Long TIMEOUT_STEP; public void sendMessage(final Object message) { final JSONObject json = JSONObject.parseObject(JSON.toJSONString(message)); Long timeout = json.get("timeout") == null? 0L:json.getLong("timeout"); final Long delay = timeout; if (IS_RETRY){ if (timeout < TIMEOUT_MAX){ timeout += TIMEOUT_STEP; }else{ timeout = TIMEOUT_MAX; } json.put("timeout", timeout); }else if (!IS_RETRY && timeout > 0){ return; } jmsTemplate.send(appAuthDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage tm = session.createTextMessage(); tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); logger.debug("mq send message:"+json.toJSONString()); tm.setText(json.toJSONString()); return tm; } }); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getAppAuthDestination() { return appAuthDestination; } public void setAppAuthDestination(Destination appAuthDestination) { this.appAuthDestination = appAuthDestination; } }
消息监听:
package com.booth.common.push; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.util.StringUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.booth.common.service.HandleApiService; import com.booth.common.service.MsgQueueSenderService; public class PushToServerByMsgQueueListener implements MessageListener{ @Autowired HandleApiService apiService; @Autowired MsgQueueSenderService msgQueueSenderService; @Override public void onMessage(Message message) { try{ if (message != null) { System.out.println("接收到Task:" + ((TextMessage) message).getText()); } JSONObject json = (JSONObject) JSON.parse(((TextMessage) message).getText()); boolean rlt = apiService.send(json); if (!rlt){ msgQueueSenderService.sendMessage(json); } }catch (Exception e) { e.printStackTrace(); } } }
其中HandleApiService 用于处理发短信的业务逻辑,其处理完返回true或者false,如果没有处理成功则重新创建任务。
消息消费者:
生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,通过配置MessageListenerContainer的时候往里面注入一个ConnectionFactory来实现的。所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定:一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener
相关配置文件:
applicationContext-mq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.7.0.xsd"> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://url地址:61616" userName="admin" password="admin" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="amqConnectionFactory" /> <property name="sessionCacheSize" value="100" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- Spring JmsTemplate 的消息生产者 start--> <bean id="msgQueueSenderService" class="com.booth.common.service.MsgQueueSenderService"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="appAuthDestination" ref="appAuthDestination" /> </bean> <!--Spring JmsTemplate 的消息生产者 end--> <!--这个是队列目的地,点对点的--> <bean id="appAuthDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="${appAuthQueue}"> </constructor-arg> </bean> <!-- 消息消费者 start--> <!-- 消息监听器 --> <bean id="appAuthListener" class="com.booth.common.push.PushToServerByMsgQueueListener"/> <bean id="msgQueueSenderService" class="com.booth.common.service.MsgQueueSenderService"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="appAuthDestination" ref="appAuthDestination" /> </bean> <!-- 消息监听容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="appAuthDestination" /> <property name="messageListener" ref="appAuthListener" /> </bean> <!-- 消息消费者 end --> </beans>
activemq.properties本案例中列举的涉及到的部分配置
appAuthQueue = appAuthTestQueue appAuthQueue.retry = true appAuthQueue.timeout.max = 30000 appAuthQueue.timeout.step = 5000
相关文章推荐
- Spring ActiveMQ 整合(三): 确认机制ACK(收到消息后,应该有一个回应也就是确认答复)
- ActiveMQ入门教程(五) - ActiveMQ与Spring整合
- activeMq 消费者整合spring
- Spring整合ActiveMQ
- ActiveMQ入门教程(五) - ActiveMQ与Spring整合
- 浅谈Spring Boot 整合ActiveMQ的过程
- SpringBoot 整合 Apache ActiveMQ
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring与ActiveMQ整合
- spring整合activemq步骤
- linux下 消息中间件ActiveMQ整合spring笔记二 接收消息
- Spring整合JMS(一)——基于ActiveMQ实现
- SpringBoot整合ActiveMQ
- 淘淘商城系列——Spring与ActiveMQ的整合及用JmsTemplate发送消息
- Spring整合JMS基于ActiveMQ实现
- 深入浅出JMS(四)--Spring和ActiveMQ整合的完整实例
- Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- 深入浅出JMS(四)----Spring和ActiveMQ整合的完整实例