ActiveMQ与Spring整合:(3)消息监听器
2016-08-29 21:34
423 查看
JMS监听器有三种消息监听器实现:MessageListener,SessionAwareMessageListener,MessageListenerAdapter。消息接受者只需要实现这些接口就可以异步接收消息。
1、实现MessageListener接口,实现MessageListener接口,必须重写onMessage方法。
package com.hua.spring.jms.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @description
* @date:(2016-8-25 下午10:01:07)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class ResponseQueueMessageListener implements MessageListener{
private Logger logger=LoggerFactory.getLogger(ResponseQueueMessageListener.class);
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text=((TextMessage) message).getText();
logger.info("消息生产者接收到响应:"+text);
} catch (JMSException e) {
logger.error("消息生产者接收消息时发生异常:",e);
}
}
}
}消息监听器需要注册到消息监听容器中,示例配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 连接工厂 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 消息转换器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置消息目标 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用于测试消息回复的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="responseMessageListener" class="com.hua.spring.jms.listener.ResponseQueueMessageListener"/>
<!-- 消息监听容器 -->
<bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="responseQueue" />
<property name="messageListener" ref="responseMessageListener" />
<property name="sessionTransacted" value="false"/>
</bean>
</beans>2、实现SessionAwareMessageListener接口,实现这个接口需要实现void onMessage(Message message, Session session)方法。这个接口与MessageListener接口不同之处在于,参数里面有Session参数,消息接受者接收消息之后可通过session进行回复消息发送者。
package com.hua.spring.jms.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
* @description
* @date:(2016-8-28 上午9:49:32)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class QueueDestinationMessageListener implements SessionAwareMessageListener<Message>{
private Logger logger=LoggerFactory.getLogger(QueueDestinationMessageListener.class);
private Destination destination;
@Override
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage){
try {
String text=((TextMessage)message).getText();
logger.info("接受者:我收到消息-->"+text);
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("Hello sender! I have received your news.");
producer.send(textMessage);
} catch (JMSException e) {
logger.error("监听消息发生异常",e);
}
}
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
同时把消息监听器注册到监听器容器中:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 连接工厂 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 消息转换器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置消息目标 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用于测试消息回复的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 可以获取session的MessageListener -->
<bean id="queueDestinationMessageListener" class="com.hua.spring.jms.listener.QueueDestinationMessageListener">
<property name="destination" ref="responseQueue"/>
</bean>
<!-- 消息监听容器 -->
<bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="queueDestinationMessageListener" />
<property name="sessionTransacted" value="false"/>
</bean>
</beans>3、MessageListenerAdapter接口,MessageListenerAdapter接口由MessageListener, SessionAwareMessageListener扩展而来。消息监听类只需要实现重载handleMessage方法,最终监听器会动态匹配参数。
package com.hua.spring.jms.listener;
import java.util.Map;
/**
* @description
* @date:(2016-8-29 下午9:19:25)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class JMSReceiver {
/**
* 接收一条转换的TextMessage消息
* */
public void handleMessage(String message){
System.out.println(message);
}
/**
*接收一条转换的ByteMessage消息
* */
public void handleMessage(byte[] message){
}
/**
* 接收一条转换的MapMessage消息
* */
public void handleMessage(Map message){
}
/**
* 接收一条转换的ObjectMessage消息
* */
public void handleMessage(Object message){
}
}
applicationContext.xml配置
1、实现MessageListener接口,实现MessageListener接口,必须重写onMessage方法。
package com.hua.spring.jms.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @description
* @date:(2016-8-25 下午10:01:07)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class ResponseQueueMessageListener implements MessageListener{
private Logger logger=LoggerFactory.getLogger(ResponseQueueMessageListener.class);
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
String text=((TextMessage) message).getText();
logger.info("消息生产者接收到响应:"+text);
} catch (JMSException e) {
logger.error("消息生产者接收消息时发生异常:",e);
}
}
}
}消息监听器需要注册到消息监听容器中,示例配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 连接工厂 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 消息转换器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置消息目标 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用于测试消息回复的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="responseMessageListener" class="com.hua.spring.jms.listener.ResponseQueueMessageListener"/>
<!-- 消息监听容器 -->
<bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="responseQueue" />
<property name="messageListener" ref="responseMessageListener" />
<property name="sessionTransacted" value="false"/>
</bean>
</beans>2、实现SessionAwareMessageListener接口,实现这个接口需要实现void onMessage(Message message, Session session)方法。这个接口与MessageListener接口不同之处在于,参数里面有Session参数,消息接受者接收消息之后可通过session进行回复消息发送者。
package com.hua.spring.jms.listener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;
/**
* @description
* @date:(2016-8-28 上午9:49:32)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class QueueDestinationMessageListener implements SessionAwareMessageListener<Message>{
private Logger logger=LoggerFactory.getLogger(QueueDestinationMessageListener.class);
private Destination destination;
@Override
public void onMessage(Message message, Session session) throws JMSException {
if(message instanceof TextMessage){
try {
String text=((TextMessage)message).getText();
logger.info("接受者:我收到消息-->"+text);
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("Hello sender! I have received your news.");
producer.send(textMessage);
} catch (JMSException e) {
logger.error("监听消息发生异常",e);
}
}
}
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
同时把消息监听器注册到监听器容器中:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd">
<!-- 连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<!-- 连接工厂 -->
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
<property name="maxConnections" value="10"/>
</bean>
<!-- 消息模板 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="defaultDestination" ref="queueDestination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
<!-- 消息转换器 -->
<!-- <property name="messageConverter" ref="emailMessageConverter"/> -->
</bean>
<!-- 配置消息目标 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp -->
<constructor-arg index="0" value="helloQueue" />
</bean>
<!-- 用于测试消息回复的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<!-- 可以获取session的MessageListener -->
<bean id="queueDestinationMessageListener" class="com.hua.spring.jms.listener.QueueDestinationMessageListener">
<property name="destination" ref="responseQueue"/>
</bean>
<!-- 消息监听容器 -->
<bean id="responseQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="pooledConnectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="queueDestinationMessageListener" />
<property name="sessionTransacted" value="false"/>
</bean>
</beans>3、MessageListenerAdapter接口,MessageListenerAdapter接口由MessageListener, SessionAwareMessageListener扩展而来。消息监听类只需要实现重载handleMessage方法,最终监听器会动态匹配参数。
package com.hua.spring.jms.listener;
import java.util.Map;
/**
* @description
* @date:(2016-8-29 下午9:19:25)
* @author Administrator
* @version v1.0
* @since v1.0
*
* Modified history
*
* Modified date:
* Modifier user:
* description:
*
* */
public class JMSReceiver {
/**
* 接收一条转换的TextMessage消息
* */
public void handleMessage(String message){
System.out.println(message);
}
/**
*接收一条转换的ByteMessage消息
* */
public void handleMessage(byte[] message){
}
/**
* 接收一条转换的MapMessage消息
* */
public void handleMessage(Map message){
}
/**
* 接收一条转换的ObjectMessage消息
* */
public void handleMessage(Object message){
}
}
applicationContext.xml配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd"> <!-- 连接池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <!-- 连接工厂 --> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> </property> <property name="maxConnections" value="10"/> </bean> <!-- 消息模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="defaultDestination" ref="queueDestination" /> <property name="messageConverter"> <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" /> </property> <!-- 消息转换器 --> <!-- <property name="messageConverter" ref="emailMessageConverter"/> --> </bean> <!-- 配置消息目标 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp --> <constructor-arg index="0" value="helloQueue" /> </bean> <!-- 用于测试消息回复的 --> <bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>responseQueue</value> </constructor-arg> </bean> <!-- 通过实现MessageListenerAdapter接口实现消息监听 --> <bean id="jMSReceiver" class="com.hua.spring.jms.listener.JMSReceiver"></bean> <bean id="queueDestinationMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg ref="jMSReceiver"></constructor-arg> </bean> <!-- 消息监听容器 --> <bean id="jMSReceiverQueueListenerAdapter" class=" org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="queueDestinationMessageListenerAdapter" /> <property name="concurrentConsumers" value="100"/> <property name="sessionTransacted" value="false"/> </bean> </beans>
相关文章推荐
- ActiveMQ整合Spring实现消息机制二(消息接收端配置)
- Spring整合JMS(二)——三种消息监听器
- Java ActiveMQ 讲解(二)Spring ActiveMQ整合+注解消息监听
- JMS--Spring整合JMS(二)——三种消息监听器
- Spring整合ActiveMQ 实现消息服务
- Spring整合ActiveMQ一(消息发送端配置)
- Spring整合JMS(二)——三种消息监听器
- Spring整合JMS(二)——三种消息监听器
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- Spring整合JMS(二)——三种消息监听器
- Spring整合Jms学习(二)_三种消息监听器
- Spring整合JMS(二)——三种消息监听器
- Spring整合JMS(二)——三种消息监听器
- Spring整合JMS(二)——三种消息监听器
- Spring2.5整合ActiveMQ 5.2(P2P文本消息)
- Spring整合JMS(二)——三种消息监听器
- spring整合activemq发送MQ消息[queue模式]实例
- spring整合activemq发送MQ消息[Topic模式]实例
- Spring整合JMS(二)——三种消息监听器
- Spring整合JMS(二)——三种消息监听器