activemq 消息消费失败之后如何重新消费
2015-11-19 16:23
239 查看
在不开启事物的情况下 采用的是应答模式4(ActiveMQSession.AUTO_ACKNOWLEDGE)消费一次 应答一次
这时候消费失败了,由于没有配置死亡队列,消息就不会被消费堆积在队列中,那么怎么才可以让消息再被消费呢?
由于项目中的应用场景,有个方案启动和停止的功能,项目启动启动监听,项目停止,停止监听
具体实现代码如下
public class MqService {
private JmsTemplate jmsTemplate;
private CachingConnectionFactory cachingProductConnectionFactory;
private CachingConnectionFactory cachingConsumersConnectionFactory;
private static final String ACTIVEMQQUEUE_OPTIMIZATION = "";// 性能优化参数
// consumer.prefetchSize预加载消息
// 20条
// ?consumer.prefetchSize=20
/**
* 发送消息
*
* @param ryzhMessage
*/
public void send(final RyzhMessage ryzhMessage) {
// 得到MQ工具类
RyzhMqHolder hodler = mqHolders.get(ryzhMessage.getShcemeId());
// 发送信息
jmsTemplate.setConnectionFactory(cachingProductConnectionFactory);
jmsTemplate.send(hodler.getDestination(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(ryzhMessage);
return objectMessage;
}
});
}
/**
* 监听
*
* @param schemeId
* @param receiver
*/
public void startListen(RyzhScheme scheme) {
RyzhMqHolder holder = mqHolders.get(scheme.getSchemeId());
try {
if (holder == null) {
RyzhConsumer consumer = (RyzhConsumer) SpringBeanUtils.getBean("ryzhConsumer");
consumer.setScheme(scheme);
logger.info("RyzhMqService-启动消息队列监听器 schemeId=" + scheme.getSchemeId());
// 如果是为空
holder = new RyzhMqHolder();
// 得到整合方案的配置
TRyzhFa tRyzhFa = ((RyzhConsumer) consumer).getScheme().getZhfa();
// 得到消息队列配置的信息
String xxdlJson = tRyzhFa.getXxdlpz();
// json转map
RyzhMqConfig ryzhMqConfig = JSON.parseObject(xxdlJson, RyzhMqConfig.class);
// 设置方案ID
holder.setSchemeId(scheme.getSchemeId());
// 设置目的地
ActiveMQQueue destination = new ActiveMQQueue(RyzhMqConfig.QUEUE_NAME + scheme.getSchemeId() + ACTIVEMQQUEUE_OPTIMIZATION);
holder.setDestination(destination);
// 创建监听器
DefaultMessageListener listener = new DefaultMessageListener();
// 给监听器设置消费者
listener.setReceiver(consumer);
// 将监听器保存在holder中
holder.setListener(listener);
// 创建监听容器
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
// 监听容器属性的配置
listenerContainer.setConnectionFactory(cachingConsumersConnectionFactory);
// 设置目的地
listenerContainer.setDestination(destination);
// 设置监听器
listenerContainer.setMessageListener(listener);
// 设置消费者集群数
listenerContainer.setConcurrentConsumers(ryzhMqConfig.getConcurrentConsumers());
// 设置监听队列还是主题 默认是队列
listenerContainer.setPubSubDomain(RyzhMqConfig.PUBSUB_DOMAIN);
listenerContainer.setPubSubNoLocal(false);
// listenerContainer.setAcceptMessagesWhileStopping(true);
// 设置应答模式 默认是4
listenerContainer.setSessionAcknowledgeMode(RyzhMqConfig.SESSION_ACKNOWLEDGEMODE);
// 设置是否启动事物 默认不开启
listenerContainer.setSessionTransacted(RyzhMqConfig.SESSION_TRANSACTED);
// 将监听容器保存在holder中
holder.setListenerContainer(listenerContainer);
// 将holder缓存在map中
mqHolders.put(scheme.getSchemeId(), holder);
// 初始化容器
holder.getListenerContainer().initialize();
// 启动监听
holder.getListenerContainer().start();
logger.info("RyzhMqService-消息队列监听器启动成功 schemeId=" + scheme.getSchemeId());
}
} catch (Exception e) {
logger.error("RyzhMqService-消息队列监听器启动失败 schemeId=" + scheme.getSchemeId(), e);
}
}
public void stopListen(RyzhScheme scheme) {
RyzhMqHolder ryzhMqHolder = mqHolders.get(scheme.getSchemeId());
// 停止监听
ryzhMqHolder.getListenerContainer().destroy();
// 移除缓存
mqHolders.remove(scheme.getSchemeId());
}
/**
* 取得MQHolder
*
* @param ryzhMessage
* @return
*/
public Map<String, RyzhMqHolder> getMqHolders() {
return mqHolders;
}
public void setMqHolders(Map<String, RyzhMqHolder> mqHolders) {
this.mqHolders = mqHolders;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public CachingConnectionFactory getCachingProductConnectionFactory() {
return cachingProductConnectionFactory;
}
public void setCachingProductConnectionFactory(CachingConnectionFactory cachingProductConnectionFactory) {
this.cachingProductConnectionFactory = cachingProductConnectionFactory;
}
public CachingConnectionFactory getCachingConsumersConnectionFactory() {
return cachingConsumersConnectionFactory;
}
public void setCachingConsumersConnecti
4000
onFactory(CachingConnectionFactory cachingConsumersConnectionFactory) {
this.cachingConsumersConnectionFactory = cachingConsumersConnectionFactory;
}
}
具体的xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa"
xmlns:lang="http://www.springframework.org/schema/lang"
xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
default-autowire="byName">
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingProductConnectionFactory"></property>
<!-- 持久化 -->
<property name="deliveryMode" value="2"></property>
<!-- 不开启事物 -->
<property name="sessionTransacted" value="false"></property>
<!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE -->
<property name="sessionAcknowledgeMode" value="4"></property>
</bean>
<bean id="ryzhMqService" class="com.dragonsoft.rygl.mq.RyzhMqService"
scope="singleton">
<property name="cachingConsumersConnectionFactory" ref="cachingConsumersConnectionFactory"></property>
<property name="cachingProductConnectionFactory" ref="cachingProductConnectionFactory"></property>
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="defaultRyzhProduceProcessor"
class="com.dragonsoft.rygl.ryzh.service.produce.DefaultRyzhProduceProcessor"
scope="prototype">
<property name="mgService" ref="ryzhMqService"></property>
<property name="jdbcTemplate" ref="jdbcTemplate"></property>
<property name="ip" value="${jmx.ip}"></property>
<property name="port" value="${jmx.port}"></property>
<property name="dataSourceManager" ref="dataSourceManager"></property>
</bean>
<!-- 生产者专用缓存池 -->
<bean id="cachingProductConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>
</bean>
<!-- 消费者专用缓存池 -->
<bean id="cachingConsumersConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>
<span style="color:#ff0000;"><property name="cacheConsumers" value="false"></property>
<property name="cacheProducers" value="false"></property></span>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL}" />
<property name="userName" value="${jms.userName}"></property>
<property name="password" value="${jms.password}"></property>
<property name="useAsyncSend" value="${jms.useAsyncSend}"></property>
注意红色部分,需要缓存,否则监听会被缓存无法stop。
以下就可以达到不用配置死信队列就可以重新消费信息。(其实原理就是重启了监听容器)
这时候消费失败了,由于没有配置死亡队列,消息就不会被消费堆积在队列中,那么怎么才可以让消息再被消费呢?
由于项目中的应用场景,有个方案启动和停止的功能,项目启动启动监听,项目停止,停止监听
具体实现代码如下
public class MqService {
private JmsTemplate jmsTemplate;
private CachingConnectionFactory cachingProductConnectionFactory;
private CachingConnectionFactory cachingConsumersConnectionFactory;
private static final String ACTIVEMQQUEUE_OPTIMIZATION = "";// 性能优化参数
// consumer.prefetchSize预加载消息
// 20条
// ?consumer.prefetchSize=20
/**
* 发送消息
*
* @param ryzhMessage
*/
public void send(final RyzhMessage ryzhMessage) {
// 得到MQ工具类
RyzhMqHolder hodler = mqHolders.get(ryzhMessage.getShcemeId());
// 发送信息
jmsTemplate.setConnectionFactory(cachingProductConnectionFactory);
jmsTemplate.send(hodler.getDestination(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(ryzhMessage);
return objectMessage;
}
});
}
/**
* 监听
*
* @param schemeId
* @param receiver
*/
public void startListen(RyzhScheme scheme) {
RyzhMqHolder holder = mqHolders.get(scheme.getSchemeId());
try {
if (holder == null) {
RyzhConsumer consumer = (RyzhConsumer) SpringBeanUtils.getBean("ryzhConsumer");
consumer.setScheme(scheme);
logger.info("RyzhMqService-启动消息队列监听器 schemeId=" + scheme.getSchemeId());
// 如果是为空
holder = new RyzhMqHolder();
// 得到整合方案的配置
TRyzhFa tRyzhFa = ((RyzhConsumer) consumer).getScheme().getZhfa();
// 得到消息队列配置的信息
String xxdlJson = tRyzhFa.getXxdlpz();
// json转map
RyzhMqConfig ryzhMqConfig = JSON.parseObject(xxdlJson, RyzhMqConfig.class);
// 设置方案ID
holder.setSchemeId(scheme.getSchemeId());
// 设置目的地
ActiveMQQueue destination = new ActiveMQQueue(RyzhMqConfig.QUEUE_NAME + scheme.getSchemeId() + ACTIVEMQQUEUE_OPTIMIZATION);
holder.setDestination(destination);
// 创建监听器
DefaultMessageListener listener = new DefaultMessageListener();
// 给监听器设置消费者
listener.setReceiver(consumer);
// 将监听器保存在holder中
holder.setListener(listener);
// 创建监听容器
DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
// 监听容器属性的配置
listenerContainer.setConnectionFactory(cachingConsumersConnectionFactory);
// 设置目的地
listenerContainer.setDestination(destination);
// 设置监听器
listenerContainer.setMessageListener(listener);
// 设置消费者集群数
listenerContainer.setConcurrentConsumers(ryzhMqConfig.getConcurrentConsumers());
// 设置监听队列还是主题 默认是队列
listenerContainer.setPubSubDomain(RyzhMqConfig.PUBSUB_DOMAIN);
listenerContainer.setPubSubNoLocal(false);
// listenerContainer.setAcceptMessagesWhileStopping(true);
// 设置应答模式 默认是4
listenerContainer.setSessionAcknowledgeMode(RyzhMqConfig.SESSION_ACKNOWLEDGEMODE);
// 设置是否启动事物 默认不开启
listenerContainer.setSessionTransacted(RyzhMqConfig.SESSION_TRANSACTED);
// 将监听容器保存在holder中
holder.setListenerContainer(listenerContainer);
// 将holder缓存在map中
mqHolders.put(scheme.getSchemeId(), holder);
// 初始化容器
holder.getListenerContainer().initialize();
// 启动监听
holder.getListenerContainer().start();
logger.info("RyzhMqService-消息队列监听器启动成功 schemeId=" + scheme.getSchemeId());
}
} catch (Exception e) {
logger.error("RyzhMqService-消息队列监听器启动失败 schemeId=" + scheme.getSchemeId(), e);
}
}
public void stopListen(RyzhScheme scheme) {
RyzhMqHolder ryzhMqHolder = mqHolders.get(scheme.getSchemeId());
// 停止监听
ryzhMqHolder.getListenerContainer().destroy();
// 移除缓存
mqHolders.remove(scheme.getSchemeId());
}
/**
* 取得MQHolder
*
* @param ryzhMessage
* @return
*/
public Map<String, RyzhMqHolder> getMqHolders() {
return mqHolders;
}
public void setMqHolders(Map<String, RyzhMqHolder> mqHolders) {
this.mqHolders = mqHolders;
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public CachingConnectionFactory getCachingProductConnectionFactory() {
return cachingProductConnectionFactory;
}
public void setCachingProductConnectionFactory(CachingConnectionFactory cachingProductConnectionFactory) {
this.cachingProductConnectionFactory = cachingProductConnectionFactory;
}
public CachingConnectionFactory getCachingConsumersConnectionFactory() {
return cachingConsumersConnectionFactory;
}
public void setCachingConsumersConnecti
4000
onFactory(CachingConnectionFactory cachingConsumersConnectionFactory) {
this.cachingConsumersConnectionFactory = cachingConsumersConnectionFactory;
}
}
具体的xml配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa"
xmlns:lang="http://www.springframework.org/schema/lang"
xsi:schemaLocation=" http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
default-autowire="byName">
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingProductConnectionFactory"></property>
<!-- 持久化 -->
<property name="deliveryMode" value="2"></property>
<!-- 不开启事物 -->
<property name="sessionTransacted" value="false"></property>
<!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE -->
<property name="sessionAcknowledgeMode" value="4"></property>
</bean>
<bean id="ryzhMqService" class="com.dragonsoft.rygl.mq.RyzhMqService"
scope="singleton">
<property name="cachingConsumersConnectionFactory" ref="cachingConsumersConnectionFactory"></property>
<property name="cachingProductConnectionFactory" ref="cachingProductConnectionFactory"></property>
<property name="jmsTemplate" ref="jmsTemplate" />
</bean>
<bean id="defaultRyzhProduceProcessor"
class="com.dragonsoft.rygl.ryzh.service.produce.DefaultRyzhProduceProcessor"
scope="prototype">
<property name="mgService" ref="ryzhMqService"></property>
<property name="jdbcTemplate" ref="jdbcTemplate"></property>
<property name="ip" value="${jmx.ip}"></property>
<property name="port" value="${jmx.port}"></property>
<property name="dataSourceManager" ref="dataSourceManager"></property>
</bean>
<!-- 生产者专用缓存池 -->
<bean id="cachingProductConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>
</bean>
<!-- 消费者专用缓存池 -->
<bean id="cachingConsumersConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>
<span style="color:#ff0000;"><property name="cacheConsumers" value="false"></property>
<property name="cacheProducers" value="false"></property></span>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerURL}" />
<property name="userName" value="${jms.userName}"></property>
<property name="password" value="${jms.password}"></property>
<property name="useAsyncSend" value="${jms.useAsyncSend}"></property>
<span style="white-space:pre"> </span><!-- <property name="redeliveryPolicy" > <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <property name="useExponentialBackOff" value="true"/> <property name="maximumRedeliveries" value="5"/><!-- 重发次数 5次 --> <property name="initialRedeliveryDelay" value="100"/><!-- 重发间隔时间100毫秒 --> <property name="backOffMultiplier" value="2.0"/> </bean> </property><pre name="code" class="html"><span style="white-space:pre"> </span>--></bean></beans>
注意红色部分,需要缓存,否则监听会被缓存无法stop。
以下就可以达到不用配置死信队列就可以重新消费信息。(其实原理就是重启了监听容器)
相关文章推荐
- 三、IOS UITableView详解
- oracle upper
- NIO通道和缓冲区
- unity 程序切进切出停止其他音乐
- Oracle 11g新特性
- python 函数参数的传递(参数带星号的说明)
- 使用IntelliJ IDEA如何使用远程debug
- 是谁拖了网站访问速度的「后腿」 ?
- 【Android】多个EditText设置其中一个不可编辑并隐藏键盘
- 通过zabora监控oracle
- 关于HttpURLConnection.setFollowRedirects
- Ubuntu 命令
- linux:srot进行对文件中的字段排序
- C与Lua互相调用的时候,栈变化分析
- android中数据的存储方式(一)File 和 SharedPreferences
- A计划(bfs)
- 关于Jeecg互联网化dubbo改造方案(续)
- Android 内存管理-需要注意的事项
- ionic 上拉刷新 ion-infinite-scroll 自动调用多次问题解决
- 给爸妈设计的一款手机