您的位置:首页 > 其它

JMS之ActiveMQ工具类分享(关于同步回执和异步回执)

2017-12-18 10:42 316 查看
时隔一年,未写博客。

如今沉浸在各种代码之中不可自拔。前段时间写了一个基于ActiveMQ的工具类,在生产环境下使用过,没有什么大问题,分享出来给大家了解,当做互相学习和参考。

具体ActiveMQ是做什么的,就不细说了,直接百度就好。

工具类并没用使用Spring去管理ActiveMQ,本身目的是方便初学者学习AMQ和理清每一个对象是如何执行的。

这个工具类主要作用是希望能帮助更多人去了解AcriveMQ对强消息回执的同步和异步处理,网络上并没有太多相关的内容,并且也没有相关的源码可以参考。

文章后会附上附件,欢迎下载,如有疑问,下方留言。

发送消息并等待回执

/**
* 发送点对点具有同步回执的消息
* @param sendTextMessage
* @param config ##如果设置JSMUtil.ACK_CLIENT_ACKNOWLEDGE,则messageListener使用message.acknowledge()进行消息确认##
* @param isPersistent
* @param messageTimeout 默认值建议设置45秒
* @return JMSSendResultModel
*/
public static JMSSendResultModel sendPointReplyTextMessageSync(
String sendTextMessage,
JMSConfigModel config,
int isPersistent,
Integer messageTimeout){

Connection connection=getConnection(config);
Session session=getSession(connection, config);
Destination destination=getDestination(session,config,false);
if(destination!=null){
String reciveTextMessageString=null;
MessageProducer messageProducer=null;
try {
messageProducer=session.createProducer(destination);
messageProducer.setDeliveryMode(isPersistent);
messageProducer.setTimeToLive(messageTimeout);
TextMessage textMsg = session.createTextMessage(sendTextMessage);
//构建回执(临时通道),这个很关键
TemporaryQueue destinationReply=session.createTemporaryQueue();
textMsg.setJMSReplyTo(destinationReply);
//
messageProducer.send(textMsg);
if(config.isTransacted())
session.commit();
if(JMSConfig.debug)logger.debug("===============发送成功:"+sendTextMessage+"    等待回执========================");
MessageConsumer messageConsumer = session.createConsumer(destinationReply);
Message replyMessage=messageConsumer.receive(messageTimeout);
if(replyMessage!=null&&(replyMessage instanceof TextMessage)){
TextMessage textMessage=(TextMessage)replyMessage;
reciveTextMessageString=textMessage.getText();

if(JMSConfig.debug)logger.debug("===============回执成功!:"+reciveTextMessageString+"  ========================");
if(config.getAcknowledge()==JMSConfig.ACK_CLIENT_ACKNOWLEDGE){
try {
replyMessage.acknowledge();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
String method = Thread.currentThread() .getStackTrace()[1].getMethodName();
logger.error("func:"+method+"  exception:"+e);
return null;
}
JMSSendResultModel jmsResultModel=new JMSSendResultModel( config, sendTextMessage,reciveTextMessageString, messageProducer);
return jmsResultModel;
}
return null;
}


接收消息并同步回执

/**
* 异步接收单点文本消息
* @param config ##如果设置JSMUtil.ACK_CLIENT_ACKNOWLEDGE,则messageListener使用message.acknowledge()进行消息确认##
* @param jmsExcute
* @return JMSReciveResultModel
*/
public static JMSReciveResultModel recivePointReplyTextMessageAsync(
JMSConfigModel config,
JMSExcuter jmsExcute){
Connection connection=getConnection(config);
Session session=getSession(connection, config);
Destination destination=getDestination(session,config,false);
if(destination!=null){
MessageConsumer  messageConsumer = null;
try {
messageConsumer = session.createConsumer(destination);
JMSMessageListener jmsMessageListener=new JMSMessageListener(session, config, jmsExcute){
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
Destination replyDestination ;
// 创建回执消息
TextMessage textMessage;
try {
String result= (String)this.getJmsExcute().excute( ((TextMessage)message).getText());
replyDestination = message.getJMSReplyTo();
textMessage = getSession().createTextMessage(result);
MessageProducer producer =  getSession().createProducer(replyDestination);
producer.send(textMessage);
producer.close();

if(JMSConfig.debug)logger.debug("===============发送回执成功!:"+result+"  ========================");
if(getConfig().getAcknowledge()==JMSConfig.ACK_CLIENT_ACKNOWLEDGE){
try {
message.acknowledge();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (JMSException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// 以上收到消息之后,从新创建生产者,然后在回执过去
}
};
messageConsumer.setMessageListener(jmsMessageListener);

} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
String method = Thread.currentThread() .getStackTrace()[1].getMethodName();
logger.error("func:"+method+"  exception:"+e);
return null;
}finally{}




激活调用方法:

发送消息

JMSSendResultModel jsModel= JMSUtil.sendPointReplyTextMessageSync(
("我是测试:"+System.currentTimeMillis()),
JMSConfig.config_NoTransactedAndACK_AUTO_ACKNOWLEDGE(JMSConfig.CONFIG_MODE_SEND,"test"),
JMSConfig.DELIVERY_PERSISTENT, 45000);[/code]接收消息

  JMSExcuter jsmExcute=new JMSExcuter() {
@Override
public Object excute( String textMessage) {
// TODO Auto-generated method stub
//     					System.out.println("接受成功啦!"+textMessage.getText());
return "我是回执,您发送的内容是:"+textMessage;
}
};
JMSReciveResultModel jmsreReciveResultModel=
JMSUtil.
recivePointReplyTextMessageAsync(
JMSConfig.config_NoTransactedAndACK_AUTO_ACKNOWLEDGE(JMSConfig.CONFIG_MODE_RECIVE,"test"),
jsmExcute);


具体还是看附件源码啦,内容有点儿多,等后面再逐步完善。有疑问请在留言中提出,万分感谢。

Active有个巨坑的地方,就是activemq-all.jar包里面包含了太多的构建包,很容易和你项目里引用的包产生冲突,就比如logback。所以请使用我附件里的核心的jar就可以了,分离掉不必要的包。

点击下载源码附件





                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: