您的位置:首页 > 编程语言 > Java开发

activeMq 消费者整合spring

2016-11-23 09:44 253 查看
package com.mq.consumer;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

public class ConsumerMessageListener implements MessageListener{

@Override

public void onMessage(Message message) {

TextMessage tm = (TextMessage) message;

try {

System.out.println("---------消息消费---------");

System.out.println("订阅者:\t consumerClient1");

System.out.println("消息内容:\t" + tm.getText());

System.out.println("消息ID:\t" + tm.getJMSMessageID());

System.out.println("消息Destination:\t" + tm.getJMSDestination());

/* System.out.println("---------更多信息---------");

System.out.println(ToStringBuilder.reflectionToString(tm));*/

System.out.println("-------------------------");

} catch (JMSException e) {

e.printStackTrace();

}

}

}

 

对应的spring的配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 配置JMS连接工厂 -->

<bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="failover:(tcp://10.31.88.166:61616)" />

<property name="useAsyncSend" value="true" />

<property name="clientID" value="consumerClienctConnect" />

</bean>

<!-- 定义消息Destination -->

<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="testSpringTopic"/>

</bean>

<!-- 配置消息消费监听者 -->

<bean id="consumerMessageListener" class="com.mq.consumer.ConsumerMessageListener" />

<bean id="consumerMessageListener2" class="com.mq.consumer.ConsumerMessageListener2" />

<!-- 消息订阅客户端1 -->

<bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="consumerConnectionFactory" />

<!-- 开启订阅模式 -->

<property name="pubSubDomain" value="true"/>

<property name="destination" ref="topicDestination" />

<property name="subscriptionDurable" value="true"/>

<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->

<property name="clientId" value="consumerClient1"/>

<property name="messageListener" ref="consumerMessageListener" />

<!-- 消息应答方式

Session.AUTO_ACKNOWLEDGE 消息自动签收

Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收

Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送

-->

<property name="sessionAcknowledgeMode" value="1"/>

</bean>

</beans>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: