您的位置:首页 > 编程语言 > Java开发

activeMQ使用总结 (spring 配置)

2016-12-15 00:00 513 查看

1 引用包

通过maven方式,应用activemq依赖包,pom.xml 添加如下信息,

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>


2 配置文件

配置server.properties文件如下,

#activity config
activity.mq.brokerURL=failover:(tcp://127.0.0.1:61616)


3 bean配置

配置相关bean,包括监听,消息发送,以及broker,queue/topic ,如下spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd"> 
<context:annotation-config/>

<!-- 配置JMS连接工厂 -->
<bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${activity.mq.brokerURL}" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="providerClientConnect" />
<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
</bean>

<amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#queueDestination"  maximumRedeliveries="10"/>

<!-- 定义消息Destination -->
<bean id="topicDestination"  class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="SpringTopic"/>
</bean>
<bean id="queueDestination"  class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="signIncomeQueue"/>
</bean>
<!-- 消息发送者客户端 -->
<bean id="providerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="providerConnectionFactory" />
<!--<property name="defaultDestination" ref="topicDestination" />-->
<property name="defaultDestination" ref="queueDestination" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="false"/>
<!--<property name="receiveTimeout" value="10000" />-->
<!-- deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false-->
<property name="explicitQosEnabled" value="true"/>
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
<!-- 发送模式
DeliveryMode.NON_PERSISTENT=1:非持久 ;
DeliveryMode.PERSISTENT=2:持久
-->
<property name="deliveryMode" value="2"/>
</bean>

<!-- 配置消息消费监听者 -->
<bean id="consumerMessageListener" class="com.company.project.service.mq.ConsumerMessageListener" />

<bean id="consumerListenerClient" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="providerConnectionFactory" />
<property name="concurrentConsumers" value="10"/>
<!--<property name="concurrency" value="10-20"/>-->
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<!--<property name="destination" ref="topicDestination" />-->
<property name="destination" ref="topicDestination" />
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumerClient"/>
<property name="messageListener" ref="consumerMessageListener" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE  消息自动签收
Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMode" value="2"/>
</bean>

</beans>


4 发送接口

发送时,java最常用的有两种格式,textMessage和mapMessage,

package com.company.project.service.impl;

import com.alibaba.fastjson.JSON;
import com.hisense.hitv.service.IQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.util.HashMap;
import java.util.Map;

@Service
public class QueueServiceImpl implements IQueueService {

@Autowired
private JmsTemplate jmsTemplate;

public boolean pushMessage2QueueIncome(String uid, Integer incomeType, Integer incomeValue, Integer productCode) {
sendMqMessageIncome(null, uid, incomeType, incomeValue, productCode);
return false;
}

/**
* 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
* @param destination
*/
private void sendMqMessageIncome(Destination destination, final String uid,final Integer incomeType,
final Integer incomeValue,final Integer productCode){
if(null == destination){
destination = jmsTemplate.getDefaultDestination();
}
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Map param = new HashMap();
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("customerId", uid);
param.put("customerId", uid);
mapMessage.setInt("incomeType", incomeType);
param.put("incomeType", incomeType);
mapMessage.setInt("incomeValue", incomeValue);
param.put("incomeValue", incomeValue);
mapMessage.setInt("productCode",productCode);
param.put("productCode", productCode);
TextMessage message= session.createTextMessage();
message.setText(JSON.toJSONString(param));
return message;
}
});
}
}


5 监听接口

监听通过onMessage接口实现,监听broker推送消息

package com.company.project.service.mq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;

import javax.jms.*;

public class ConsumerMessageListener implements SessionAwareMessageListener{
private static Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);

public void onMessage(Message message, Session session) throws JMSException {
MapMessage tm = (MapMessage) message;

try {

logger.info("---------消息消费---------");
logger.info("消息ID:\t" + tm.getJMSMessageID());
} catch (JMSException e) {
session.recover();//唤起重传
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spring+activemq