基于request/reply模式的MQ例子
2007-01-21 17:01
274 查看
今天做了一个request/reply模式的MQ例子程序。
一个程序把消息放在Q上,请求的程序会在把消息放到Q上之前设置replyToQueue和replyToQueueManager消息标题属性。而后它打开回复队列并等待correlationId匹配已发出请求消息的MessageId值的消息。
另一个程序是在接受到消息后判断如果是请求消息的话,则生成回复消息并发到请求消息指定的消息队列上。它还将拷贝请求消息的MessageId到回复消息的correlationId消息标题字段上。
程序的代码如下:
import com.ibm.mq.*;
/** *//**
* @author ralph
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class Requester ...{
public static void main(String[] args) ...{
try ...{
String hostName = "neu";
String channel = "ch_server";
String qManager = "QM_guo";
String requestQueue = "Q_request";
String replyToQueue = "Q_reply";
String replyToQueueManager = "QM_guo";
// set up the MQEnvironment properties for the client
MQEnvironment.hostname = hostName;
MQEnvironment.channel = channel;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// connetion to Q Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
// set up the open options
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue queue = qMgr.accessQueue(requestQueue, openOptions, null,
null, null);
// set the put message options,will use the default settings
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;
pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT;
// build a message and write data
MQMessage outMsg = new MQMessage();
outMsg.messageFlags = MQC.MQMT_REQUEST;
outMsg.replyToQueueManagerName = replyToQueueManager;
outMsg.replyToQueueName = replyToQueue;
// prepare message with the user data
String msgString = "The request message from requester program!";
outMsg.writeUTF(msgString);
// Now we put the message on the Q
queue.put(outMsg, pmo);
// commit the transaction
qMgr.commit();
System.out.println("The message has been sussesfully put #####");
// close the Q
queue.close();
// set up the open options
int openOptions2 = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue respQueue = qMgr.accessQueue(replyToQueue, openOptions2,
null, null, null);
MQMessage respMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
gmo.options = gmo.options + MQC.MQGMO_WAIT;
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 10000;
respMessage.correlationId = outMsg.correlationId;
// get the response message
respQueue.get(respMessage, gmo);
String response = respMessage.readUTF();
System.out.println("The response message is:" + response);
qMgr.commit();
respQueue.close();
qMgr.disconnect();
} catch (MQException ex) ...{
System.out.println("Completion code is:" + ex.completionCode
+ " reason code is:" + ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) ...{
e.printStackTrace();
}
}
}
/** *//**
* @author ralph
*
* TODO To change the template for this generated type comment go to
* Window - Preferences - Java - Code Style - Code Templates
*/
import com.ibm.mq.*;
public class Responder ...{
public static void main(String[] args) ...{
try ...{
String hostName = "neu";
String channel = "ch_server";
String qManager = "QM_guo";
String qName = "Q_request";
// set up the MQEnvironment properties for the client
MQEnvironment.hostname = hostName;
MQEnvironment.channel = channel;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// connetion to Q Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
// set up the open options
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
null);
// set the put message options
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
gmo.options = gmo.options + MQC.MQGMO_WAIT;
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.waitInterval = 3000;
// build mssage
MQMessage inMsg = new MQMessage();
// get the message from Q
queue.get(inMsg, gmo);
// read the data from the message
String msgString = inMsg.readUTF();
System.out.println("The Message from Q is :" + msgString);
// check if message is of type request message and reply to the
// request
if (inMsg.messageFlags == MQC.MQMT_REQUEST) ...{
System.out.println("Praparing to reply to the request");
String replyQueueName = inMsg.replyToQueueName;
int openOptions2 = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue respQueue = qMgr
.accessQueue(replyQueueName, openOptions2,
inMsg.replyToQueueManagerName, null, null);
MQMessage respMessage = new MQMessage();
respMessage.correlationId = inMsg.messageId;
MQPutMessageOptions pmo = new MQPutMessageOptions();
respMessage.messageFlags = MQC.MQMT_REPLY;
String response = "reply from responder program";
respMessage.writeUTF(response);
respQueue.put(respMessage, pmo);
System.out.println("The response sucessfully send!");
qMgr.commit();
respQueue.close();
}
queue.close();
qMgr.disconnect();
} catch (MQException ex) ...{
System.out.println("Completion code is:" + ex.completionCode
+ " reason code is:" + ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) ...{
e.printStackTrace();
}
}
}
一个程序把消息放在Q上,请求的程序会在把消息放到Q上之前设置replyToQueue和replyToQueueManager消息标题属性。而后它打开回复队列并等待correlationId匹配已发出请求消息的MessageId值的消息。
另一个程序是在接受到消息后判断如果是请求消息的话,则生成回复消息并发到请求消息指定的消息队列上。它还将拷贝请求消息的MessageId到回复消息的correlationId消息标题字段上。
程序的代码如下:
import com.ibm.mq.*;
/** *//**
* @author ralph
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class Requester ...{
public static void main(String[] args) ...{
try ...{
String hostName = "neu";
String channel = "ch_server";
String qManager = "QM_guo";
String requestQueue = "Q_request";
String replyToQueue = "Q_reply";
String replyToQueueManager = "QM_guo";
// set up the MQEnvironment properties for the client
MQEnvironment.hostname = hostName;
MQEnvironment.channel = channel;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// connetion to Q Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
// set up the open options
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue queue = qMgr.accessQueue(requestQueue, openOptions, null,
null, null);
// set the put message options,will use the default settings
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;
pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT;
// build a message and write data
MQMessage outMsg = new MQMessage();
outMsg.messageFlags = MQC.MQMT_REQUEST;
outMsg.replyToQueueManagerName = replyToQueueManager;
outMsg.replyToQueueName = replyToQueue;
// prepare message with the user data
String msgString = "The request message from requester program!";
outMsg.writeUTF(msgString);
// Now we put the message on the Q
queue.put(outMsg, pmo);
// commit the transaction
qMgr.commit();
System.out.println("The message has been sussesfully put #####");
// close the Q
queue.close();
// set up the open options
int openOptions2 = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue respQueue = qMgr.accessQueue(replyToQueue, openOptions2,
null, null, null);
MQMessage respMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
gmo.options = gmo.options + MQC.MQGMO_WAIT;
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 10000;
respMessage.correlationId = outMsg.correlationId;
// get the response message
respQueue.get(respMessage, gmo);
String response = respMessage.readUTF();
System.out.println("The response message is:" + response);
qMgr.commit();
respQueue.close();
qMgr.disconnect();
} catch (MQException ex) ...{
System.out.println("Completion code is:" + ex.completionCode
+ " reason code is:" + ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) ...{
e.printStackTrace();
}
}
}
/** *//**
* @author ralph
*
* TODO To change the template for this generated type comment go to
* Window - Preferences - Java - Code Style - Code Templates
*/
import com.ibm.mq.*;
public class Responder ...{
public static void main(String[] args) ...{
try ...{
String hostName = "neu";
String channel = "ch_server";
String qManager = "QM_guo";
String qName = "Q_request";
// set up the MQEnvironment properties for the client
MQEnvironment.hostname = hostName;
MQEnvironment.channel = channel;
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// connetion to Q Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
// set up the open options
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
// open the Q
MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
null);
// set the put message options
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
gmo.options = gmo.options + MQC.MQGMO_WAIT;
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
gmo.waitInterval = 3000;
// build mssage
MQMessage inMsg = new MQMessage();
// get the message from Q
queue.get(inMsg, gmo);
// read the data from the message
String msgString = inMsg.readUTF();
System.out.println("The Message from Q is :" + msgString);
// check if message is of type request message and reply to the
// request
if (inMsg.messageFlags == MQC.MQMT_REQUEST) ...{
System.out.println("Praparing to reply to the request");
String replyQueueName = inMsg.replyToQueueName;
int openOptions2 = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue respQueue = qMgr
.accessQueue(replyQueueName, openOptions2,
inMsg.replyToQueueManagerName, null, null);
MQMessage respMessage = new MQMessage();
respMessage.correlationId = inMsg.messageId;
MQPutMessageOptions pmo = new MQPutMessageOptions();
respMessage.messageFlags = MQC.MQMT_REPLY;
String response = "reply from responder program";
respMessage.writeUTF(response);
respQueue.put(respMessage, pmo);
System.out.println("The response sucessfully send!");
qMgr.commit();
respQueue.close();
}
queue.close();
qMgr.disconnect();
} catch (MQException ex) ...{
System.out.println("Completion code is:" + ex.completionCode
+ " reason code is:" + ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) ...{
e.printStackTrace();
}
}
}
相关文章推荐
- ZeroMQ之模式 请求回应模型(Request-Reply)
- NetMQ(二): 请求响应模式 Request-Reply
- C#基于Socket的CS模式的完整例子
- C#基于Socket的CS模式的完整例子
- 基于人吃饭的例子设计一个装饰设计模式:
- C#基于Socket的CS模式的完整例子
- WCF系列教程之消息交换模式之请求与答复模式(Request/Reply)
- C#基于Socket的CS模式的完整例子
- 应用开发的结构浅析----一个基于Proxy-Stub模式的例子
- Comet4J(Comet for Java)是一个纯粹基于AJAX(XMLHTTPRequest)的服务器推送框架,消息以JSON方式传递,具备长轮询、长连接、自动选择三种工作模式。
- 基于Qt和GLSL的着色器例子
- 基于hibernate开发模式的设计改进
- 基于Windows Server 2008 R2的Failover Cluster(故障转移群集)部署Sql Server 2008 AA(主主) 模式群集(第二部分)
- 设计模式六大原则例子(四)-- 依赖倒置原则(DIP)例子
- 一个例子穿插三种不同的工厂模式形态
- RMI入门小例子--代理模式
- 【socket编程】一个简单的基于TCP的客户/服务端例子(vs2008)
- 基于事件的异步设计模式
- 最简单的基于FFmpeg的AVDevice例子(屏幕录制)
- Java程序员的日常—— 基于类的策略模式、List<?>与List、泛型编译警告、同比和环比