activeMQ使用总结 (spring 配置)
2016-12-15 00:00
513 查看
1 引用包
通过maven方式,应用activemq依赖包,pom.xml 添加如下信息,<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
2 配置文件
配置server.properties文件如下,#activity config activity.mq.brokerURL=failover:(tcp://127.0.0.1:61616)
3 bean配置
配置相关bean,包括监听,消息发送,以及broker,queue/topic ,如下spring-activemq.xml<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> <context:annotation-config/> <!-- 配置JMS连接工厂 --> <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activity.mq.brokerURL}" /> <property name="useAsyncSend" value="true" /> <property name="clientID" value="providerClientConnect" /> <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/> </bean> <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#queueDestination" maximumRedeliveries="10"/> <!-- 定义消息Destination --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="SpringTopic"/> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="signIncomeQueue"/> </bean> <!-- 消息发送者客户端 --> <bean id="providerJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="providerConnectionFactory" /> <!--<property name="defaultDestination" ref="topicDestination" />--> <property name="defaultDestination" ref="queueDestination" /> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="false"/> <!--<property name="receiveTimeout" value="10000" />--> <!-- deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false--> <property name="explicitQosEnabled" value="true"/> <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 --> <property name="deliveryMode" value="2"/> </bean> <!-- 配置消息消费监听者 --> <bean id="consumerMessageListener" class="com.company.project.service.mq.ConsumerMessageListener" /> <bean id="consumerListenerClient" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="providerConnectionFactory" /> <property name="concurrentConsumers" value="10"/> <!--<property name="concurrency" value="10-20"/>--> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true"/> <!--<property name="destination" ref="topicDestination" />--> <property name="destination" ref="topicDestination" /> <property name="subscriptionDurable" value="true"/> <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉--> <property name="clientId" value="consumerClient"/> <property name="messageListener" ref="consumerMessageListener" /> <!-- 消息应答方式 Session.AUTO_ACKNOWLEDGE 消息自动签收 Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送 --> <property name="sessionAcknowledgeMode" value="2"/> </bean> </beans>
4 发送接口
发送时,java最常用的有两种格式,textMessage和mapMessage,package com.company.project.service.impl; import com.alibaba.fastjson.JSON; import com.hisense.hitv.service.IQueueService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.jms.*; import java.util.HashMap; import java.util.Map; @Service public class QueueServiceImpl implements IQueueService { @Autowired private JmsTemplate jmsTemplate; public boolean pushMessage2QueueIncome(String uid, Integer incomeType, Integer incomeValue, Integer productCode) { sendMqMessageIncome(null, uid, incomeType, incomeValue, productCode); return false; } /** * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination * @param destination */ private void sendMqMessageIncome(Destination destination, final String uid,final Integer incomeType, final Integer incomeValue,final Integer productCode){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Map param = new HashMap(); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("customerId", uid); param.put("customerId", uid); mapMessage.setInt("incomeType", incomeType); param.put("incomeType", incomeType); mapMessage.setInt("incomeValue", incomeValue); param.put("incomeValue", incomeValue); mapMessage.setInt("productCode",productCode); param.put("productCode", productCode); TextMessage message= session.createTextMessage(); message.setText(JSON.toJSONString(param)); return message; } }); } }
5 监听接口
监听通过onMessage接口实现,监听broker推送消息package com.company.project.service.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SessionAwareMessageListener; import javax.jms.*; public class ConsumerMessageListener implements SessionAwareMessageListener{ private static Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class); public void onMessage(Message message, Session session) throws JMSException { MapMessage tm = (MapMessage) message; try { logger.info("---------消息消费---------"); logger.info("消息ID:\t" + tm.getJMSMessageID()); } catch (JMSException e) { session.recover();//唤起重传 e.printStackTrace(); } } }
相关文章推荐
- Spring整合ActiveMQ
- Thinking in Java
- Java多线程实战
- java 文件压缩解压编码问题
- 深入理解Java虚拟机:JVM高级特性与最佳实践
- common-lang和beanutil
- 小博老师解析Java核心技术 ——JSwing窗体布局
- 博为峰Java技术题 ——JavaSE Java Swing顶层容器类和包含层次Ⅱ
- Spring MVC @CookieValue学习
- spring整合任务调度
- SpringMVC获取页面参数的三种方法
- Spring对加载的bean之间循环依赖的处理
- 【spring】1、三种装配bean的方式
- storm java编程
- java函数式编程之Predicate
- Java.lang.String API笔记
- Java字符串加号
- JAVA基础
- Springboot集成dubbo
- MyEclipse 2014破解失败完美解决方案,真实可行、!invalid activation code please reenter code