JMS之ActiveMQ工具类分享(关于同步回执和异步回执)
2017-12-18 10:42
316 查看
时隔一年,未写博客。
如今沉浸在各种代码之中不可自拔。前段时间写了一个基于ActiveMQ的工具类,在生产环境下使用过,没有什么大问题,分享出来给大家了解,当做互相学习和参考。
具体ActiveMQ是做什么的,就不细说了,直接百度就好。
工具类并没用使用Spring去管理ActiveMQ,本身目的是方便初学者学习AMQ和理清每一个对象是如何执行的。
这个工具类主要作用是希望能帮助更多人去了解AcriveMQ对强消息回执的同步和异步处理,网络上并没有太多相关的内容,并且也没有相关的源码可以参考。
文章后会附上附件,欢迎下载,如有疑问,下方留言。
发送消息并等待回执
接收消息并同步回执
激活调用方法:
发送消息
如今沉浸在各种代码之中不可自拔。前段时间写了一个基于ActiveMQ的工具类,在生产环境下使用过,没有什么大问题,分享出来给大家了解,当做互相学习和参考。
具体ActiveMQ是做什么的,就不细说了,直接百度就好。
工具类并没用使用Spring去管理ActiveMQ,本身目的是方便初学者学习AMQ和理清每一个对象是如何执行的。
这个工具类主要作用是希望能帮助更多人去了解AcriveMQ对强消息回执的同步和异步处理,网络上并没有太多相关的内容,并且也没有相关的源码可以参考。
文章后会附上附件,欢迎下载,如有疑问,下方留言。
发送消息并等待回执
/** * 发送点对点具有同步回执的消息 * @param sendTextMessage * @param config ##如果设置JSMUtil.ACK_CLIENT_ACKNOWLEDGE,则messageListener使用message.acknowledge()进行消息确认## * @param isPersistent * @param messageTimeout 默认值建议设置45秒 * @return JMSSendResultModel */ public static JMSSendResultModel sendPointReplyTextMessageSync( String sendTextMessage, JMSConfigModel config, int isPersistent, Integer messageTimeout){ Connection connection=getConnection(config); Session session=getSession(connection, config); Destination destination=getDestination(session,config,false); if(destination!=null){ String reciveTextMessageString=null; MessageProducer messageProducer=null; try { messageProducer=session.createProducer(destination); messageProducer.setDeliveryMode(isPersistent); messageProducer.setTimeToLive(messageTimeout); TextMessage textMsg = session.createTextMessage(sendTextMessage); //构建回执(临时通道),这个很关键 TemporaryQueue destinationReply=session.createTemporaryQueue(); textMsg.setJMSReplyTo(destinationReply); // messageProducer.send(textMsg); if(config.isTransacted()) session.commit(); if(JMSConfig.debug)logger.debug("===============发送成功:"+sendTextMessage+" 等待回执========================"); MessageConsumer messageConsumer = session.createConsumer(destinationReply); Message replyMessage=messageConsumer.receive(messageTimeout); if(replyMessage!=null&&(replyMessage instanceof TextMessage)){ TextMessage textMessage=(TextMessage)replyMessage; reciveTextMessageString=textMessage.getText(); if(JMSConfig.debug)logger.debug("===============回执成功!:"+reciveTextMessageString+" ========================"); if(config.getAcknowledge()==JMSConfig.ACK_CLIENT_ACKNOWLEDGE){ try { replyMessage.acknowledge(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); String method = Thread.currentThread() .getStackTrace()[1].getMethodName(); logger.error("func:"+method+" exception:"+e); return null; } JMSSendResultModel jmsResultModel=new JMSSendResultModel( config, sendTextMessage,reciveTextMessageString, messageProducer); return jmsResultModel; } return null; }
接收消息并同步回执
/** * 异步接收单点文本消息 * @param config ##如果设置JSMUtil.ACK_CLIENT_ACKNOWLEDGE,则messageListener使用message.acknowledge()进行消息确认## * @param jmsExcute * @return JMSReciveResultModel */ public static JMSReciveResultModel recivePointReplyTextMessageAsync( JMSConfigModel config, JMSExcuter jmsExcute){ Connection connection=getConnection(config); Session session=getSession(connection, config); Destination destination=getDestination(session,config,false); if(destination!=null){ MessageConsumer messageConsumer = null; try { messageConsumer = session.createConsumer(destination); JMSMessageListener jmsMessageListener=new JMSMessageListener(session, config, jmsExcute){ @Override public void onMessage(Message message) { // TODO Auto-generated method stub Destination replyDestination ; // 创建回执消息 TextMessage textMessage; try { String result= (String)this.getJmsExcute().excute( ((TextMessage)message).getText()); replyDestination = message.getJMSReplyTo(); textMessage = getSession().createTextMessage(result); MessageProducer producer = getSession().createProducer(replyDestination); producer.send(textMessage); producer.close(); if(JMSConfig.debug)logger.debug("===============发送回执成功!:"+result+" ========================"); if(getConfig().getAcknowledge()==JMSConfig.ACK_CLIENT_ACKNOWLEDGE){ try { message.acknowledge(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } catch (JMSException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } // 以上收到消息之后,从新创建生产者,然后在回执过去 } }; messageConsumer.setMessageListener(jmsMessageListener); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); String method = Thread.currentThread() .getStackTrace()[1].getMethodName(); logger.error("func:"+method+" exception:"+e); return null; }finally{}
激活调用方法:
发送消息
JMSSendResultModel jsModel= JMSUtil.sendPointReplyTextMessageSync( ("我是测试:"+System.currentTimeMillis()), JMSConfig.config_NoTransactedAndACK_AUTO_ACKNOWLEDGE(JMSConfig.CONFIG_MODE_SEND,"test"), JMSConfig.DELIVERY_PERSISTENT, 45000);[/code]接收消息JMSExcuter jsmExcute=new JMSExcuter() { @Override public Object excute( String textMessage) { // TODO Auto-generated method stub // System.out.println("接受成功啦!"+textMessage.getText()); return "我是回执,您发送的内容是:"+textMessage; } }; JMSReciveResultModel jmsreReciveResultModel= JMSUtil. recivePointReplyTextMessageAsync( JMSConfig.config_NoTransactedAndACK_AUTO_ACKNOWLEDGE(JMSConfig.CONFIG_MODE_RECIVE,"test"), jsmExcute);
具体还是看附件源码啦,内容有点儿多,等后面再逐步完善。有疑问请在留言中提出,万分感谢。
Active有个巨坑的地方,就是activemq-all.jar包里面包含了太多的构建包,很容易和你项目里引用的包产生冲突,就比如logback。所以请使用我附件里的核心的jar就可以了,分离掉不必要的包。
点击下载源码附件
相关文章推荐
- mysql关于“异步复制”“同步复制”“半同步复制”“无损复制”的概念与区别
- 关于异步,同步,阻塞与非阻塞
- 关于IO的同步,异步,阻塞,非阻塞
- 关于同步,异步,阻塞,非阻塞,IOCP/epoll,select/poll,AIO ,NIO ,BIO的总结
- 关于同步、异步与阻塞、非阻塞的理解
- 项目优化经验分享(三)数据调用同步与异步
- [异步][jms][activeMq]如何做到重试机制不会导致一条消息被多次执行.
- 关于阻塞/非阻塞/同步/异步问题
- 关于异步,同步,阻塞与非阻塞
- 初学者线程练习关于异步和同步
- 关于IO的同步,异步,阻塞,非阻塞
- 关于同步和异步调用显示,增加用户体现总结
- .NET关于同步、异步及Socket
- 关于IO的同步,异步,阻塞,非阻塞
- 1.异步消息Jms及其JmsTemplate的源代码分析,消息代理ActiveMQ
- Spring framework(10):集成 JMS 异步消息队列(ActiveMQ)
- 关于ajax同步与异步的实现问题
- 关于同步,异步,阻塞,非阻塞,IOCP/epoll,select/poll,AIO ,NIO ,BIO的总结
- 分享一个关于Java日期时间的工具类
- 关于长链接,短链接,异步,同步,单工,双工的定义(转)