您的位置:首页 > 其它

消息队列监听器无法注入bean的问题解决

2018-01-11 15:04 246 查看
在使用ActiveMq或者其他消息队列的过程中,第一步对ActiveMq进行初始化的过程是放在ContextListener的初始化的时候的;

然而当mq接收到消息的时候触发getMqMsg方法的时候报错null,调试发现MqManageService根本没有获取到,我的想法是获取的方式不对,@Autowired这种方式获取bean不成功,但是bean肯定是已经存在于spring容器中的,如果对思维理解熟悉的人已经知道我的问题出在哪里了,没错,此时,spring容器中根本没有完成对注解bean的扫描,因为dispatcher.xml中配置的注解bean的优先级肯定没有框架中的contextListener的优先级高,contextListener初始化的时候根据@Autowired进行扫描的话,肯定是null的呀!

当我使用new对象的时候确实可以成功获取到这个MqmanageService,但是其调用的service(也是注解方式的)又获取不到了,原因是一样的。那么我可不可以在这里也new一个对象出来呢?可以是可以,但是没有意义,因为通过注解方式注册到容器中的bean本身就是单例模式,再new一个出来,违背初衷。还有一种方式就是可以通过ApplicationContextAware来getBean,这里的Bean的name就是@Service注解的类的小写全称。

下面给出我的代码例子:

@WebListener
public class MessageApplicationListener implements ServletContextListener {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageApplicationListener.class);
UserModifyMessageHandler userModifyMessageHandler;
ActivitySignMessageHandler activitySignMessageHandler;
CommonPayMessageHandler commonPayMessageHandler;
IConsumerProcessor consumerProcessor;

@Override
public void contextInitialized(ServletContextEvent sce) {
LOGGER.info("init MessageApplicationListener...");

try {
WebApplicationContext ctx = WebApplicationContextUtils.getRequiredWebApplicationContext(sce.getServletContext());
LOGGER.info("get ctx=" + ctx);
userModifyMessageHandler = (UserModifyMessageHandler) ctx.getBean("userModifyMessageHandler");
activitySignMessageHandler = (ActivitySignMessageHandler) ctx.getBean("activitySignMessageHandler");
commonPayMessageHandler = (CommonPayMessageHandler) ctx.getBean("commonPayMessageHandler");
LOGGER.info("get messageHandler:{},{},{}" , userModifyMessageHandler, activitySignMessageHandler,commonPayMessageHandler);

//开始处理消息队列
dealUserModifyQueue();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}

private void dealUserModifyQueue() throws Exception {
// 处理消息
consumerProcessor = MessageBusClient.buildConsumerFactory("user.info.modify");
consumerProcessor.receive(userModifyMessageHandler);

// 处理消息
consumerProcessor = MessageBusClient.buildConsumerFactory("event.sign.audit");
consumerProcessor.receive(activitySignMessageHandler);

consumerProcessor = MessageBusClient.buildConsumerFactory("event.pay.notify");
consumerProcessor.receive(activitySignMessageHandler);

IConsumerProcessor consumerProcessor2 = MessageBusClient.buildConsumerFactory(WxCommonPayController.notifyTopic);
consumerProcessor2.receive(commonPayMessageHandler);

LOGGER.info("activitySignMessageHandler:{}, {}", consumerProcessor, consumerProcessor == null ? "is null" : "not null");
LOGGER.info("consumer topics {}, {}", consumerProcessor.getTopics(), consumerProcessor2.getTopics());

}

@Override
public void contextDestroyed(ServletContextEvent sce) {
LOGGER.info("destroy MessageApplicationListener...");
}
}

@Component
public class UserModifyMessageHandler implements IMessageListener {

private Logger log = LoggerFactory.getLogger(UserModifyMessageHandler.class);

@Autowired
MessageListener messageDelegateListener;

@Override
public ConsumeStatus messageHandle(ConsumerMessage consumerMessage) {
log.info("get message from queue msgKey:{},body:{}",consumerMessage.getMsgKey(),consumerMessage.getBody());
String msgKey = (String) consumerMessage.getMsgKey();
Object body=consumerMessage.getBody();

//key类似user.info.modify-6201955046674923525格式
messageDelegateListener.handleMessage((String)body);
log.info("deal message finish,msgKey:{} ",consumerMessage.getMsgKey());
return ConsumeStatus.CONSUME_SUCCESS;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: