activeMq 消费者整合spring
2016-11-23 09:44
253 查看
package com.mq.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("---------消息消费---------");
System.out.println("订阅者:\t consumerClient1");
System.out.println("消息内容:\t" + tm.getText());
System.out.println("消息ID:\t" + tm.getJMSMessageID());
System.out.println("消息Destination:\t" + tm.getJMSDestination());
/* System.out.println("---------更多信息---------");
System.out.println(ToStringBuilder.reflectionToString(tm));*/
System.out.println("-------------------------");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
对应的spring的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 配置JMS连接工厂 -->
<bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://10.31.88.166:61616)" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="consumerClienctConnect" />
</bean>
<!-- 定义消息Destination -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="testSpringTopic"/>
</bean>
<!-- 配置消息消费监听者 -->
<bean id="consumerMessageListener" class="com.mq.consumer.ConsumerMessageListener" />
<bean id="consumerMessageListener2" class="com.mq.consumer.ConsumerMessageListener2" />
<!-- 消息订阅客户端1 -->
<bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerConnectionFactory" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="topicDestination" />
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumerClient1"/>
<property name="messageListener" ref="consumerMessageListener" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMode" value="1"/>
</bean>
</beans>
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener{
@Override
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("---------消息消费---------");
System.out.println("订阅者:\t consumerClient1");
System.out.println("消息内容:\t" + tm.getText());
System.out.println("消息ID:\t" + tm.getJMSMessageID());
System.out.println("消息Destination:\t" + tm.getJMSDestination());
/* System.out.println("---------更多信息---------");
System.out.println(ToStringBuilder.reflectionToString(tm));*/
System.out.println("-------------------------");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
对应的spring的配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 配置JMS连接工厂 -->
<bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://10.31.88.166:61616)" />
<property name="useAsyncSend" value="true" />
<property name="clientID" value="consumerClienctConnect" />
</bean>
<!-- 定义消息Destination -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="testSpringTopic"/>
</bean>
<!-- 配置消息消费监听者 -->
<bean id="consumerMessageListener" class="com.mq.consumer.ConsumerMessageListener" />
<bean id="consumerMessageListener2" class="com.mq.consumer.ConsumerMessageListener2" />
<!-- 消息订阅客户端1 -->
<bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="consumerConnectionFactory" />
<!-- 开启订阅模式 -->
<property name="pubSubDomain" value="true"/>
<property name="destination" ref="topicDestination" />
<property name="subscriptionDurable" value="true"/>
<!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
<property name="clientId" value="consumerClient1"/>
<property name="messageListener" ref="consumerMessageListener" />
<!-- 消息应答方式
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
-->
<property name="sessionAcknowledgeMode" value="1"/>
</bean>
</beans>
相关文章推荐
- ActiveMQ整合Spring(多消费者)
- Spring整合activemq,实现单消费者消费
- junit测试spring整合ActiveMQ编写消费者代码报错:org.springframework.beans.factory.BeanCreationException: Error crea
- Spring整合JMS(一)——基于ActiveMQ实现
- Spring整合ActiveMQ
- Spring整合JMS——基于ActiveMQ实现
- linux下 消息中间件ActiveMQ整合spring笔记二 接收消息
- Spring ActiveMQ 整合(四): JMS 事务管理
- Spring和ActiveMQ整合的完整实例
- Spring整合JMS(一)——基于ActiveMQ实现
- springboot与ActiveMQ整合
- ActiveMQ整合Spring
- spring boot 整合activemq 进行服务端消息推送(web页面)
- activeMQ和spring的整合
- Spring Boot与ActiveMQ整合
- Spring整合ActiveMQ消息中间件
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- ActiveMQ与Spring整合:(1)基本使用
- spring boot整合activeMQ,实现ptp和topic两者消息模式