您的位置:首页 > 其它

IBM Websphere MQ

2010-03-24 16:58 429 查看
import java.io.IOException;
import java.util.Hashtable;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;

/**//*
* 可以在MQ的资源管理器的某一个队列上放入测试消息、浏览消息等
* 可以放入多条消息,按先进先出的方式取得
*/
public class MQTest {

private String qManager;// QueueManager名

private MQQueueManager qMgr;

private MQQueue qQueue;

String HOST_NAME;

int PORT = 0;

String Q_NAME;

String CHANNEL;

int CCSID;

String Msg;

public void init() {

try {

qMgr = new MQQueueManager("HUANG", new Hashtable());
qQueue = qMgr.accessQueue("HUANGQ1", MQConstants.MQOO_OUTPUT);

// HOST_NAME = "localhost";
// PORT = 1414;
// qManager = "HUANG";
// Q_NAME = "CBMQ";
// CHANNEL = "CBRS001";
// CCSID = 1381; // 表示是简体中文,
// // CCSID的值在AIX上一般设为1383,如果要支持GBK则设为1386,在WIN上设为1381。
//
Msg = "MQ测试发送12321321321";
// // System.out.println(Msg);
//
// MQEnvironment.hostname = HOST_NAME; // 安裝MQ所在的ip address
// MQEnvironment.port = PORT; // TCP/IP port
//
// MQEnvironment.channel = CHANNEL;
// MQEnvironment.CCSID = CCSID;
//
// qMgr = new MQQueueManager(qManager);
//
// /**//*
// * try { XADataSource ds = null; Connection con =
// * (Connection)qMgr.getJDBCConnection(ds); } catch (SQLException
// * e) { e.printStackTrace(); } catch (Exception e) {
// * e.printStackTrace(); }
// */
//
// // int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用
// // int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用
// int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE
// | MQC.MQOO_OUTPUT;
//
// qQueue = qMgr.accessQueue(Q_NAME, qOptioin);

} catch (MQException e) {
e.printStackTrace();
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
}
}

void finalizer() {
try {
qQueue.close();
qMgr.disconnect();
} catch (MQException e) {
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
}
}

/**//*
* 取过一次,下次就没有了
*/
@SuppressWarnings("deprecation")
// public void GetMsg() {
// try {
// MQMessage retrievedMessage = new MQMessage();
//
// MQGetMessageOptions gmo = new MQGetMessageOptions();
// gmo.options += MQC.MQPMO_SYNCPOINT;
//
//// gmo.options = MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING;
//
//
// /*设置打开选项以便打开用于输出的队列,如果队列管理器停止,我们也已设置了选项去应对不成功情况*/
//// gmo.options = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
//
// /*打开队列*/
//// MQQueue queue = qMgr.accessQueue("HUANGQ2", gmo.options, null, null, null);
//// qQueue = qMgr.accessQueue(Q_NAME, gmo.options );
//
//
// qQueue.get(retrievedMessage, gmo);
//
// int length = retrievedMessage.getDataLength();
//
// byte[] msg = new byte[length];
//
// retrievedMessage.readFully(msg);
//
// String sMsg = new String(msg);
// System.out.println(sMsg);
//
// } catch (RuntimeException e) {
// e.printStackTrace();
// } catch (MQException e) {
// if (e.reasonCode != 2033) // 没有消息
// {
// e.printStackTrace();
// System.out
// .println("A WebSphere MQ error occurred : Completion code "
// + e.completionCode
// + " Reason Code is "
// + e.reasonCode);
// }
// } catch (java.io.IOException e) {
// System.out
// .println("An error occurred whilst to the message buffer "
// + e);
// }
// }

public void GetMsg() throws IOException, MQException {

String messageString;

qMgr = new MQQueueManager("HUANG");

// 构造队列管理器;
// queueManager = new MQQueueManager(queueManagerName);
// 设置队列打开选项为可读取且可存放;

int openOptions = MQC.MQOO_BROWSE|MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// 从本地队列获取消息;
// qQueue = qMgr.accessQueue("HUANGQ1", MQConstants.MQOO_OUTPUT);
qQueue = qMgr.accessQueue("HUANGQ1", openOptions);
// 构造新消息对象,为读取队列中的消息做准备;
MQMessage message = new MQMessage();
// 构造读取消息选项对象;
MQGetMessageOptions gmo = new MQGetMessageOptions();
// 指定读取消息选项值;

gmo.options = MQC.MQOO_OUTPUT;
// 从已经打开的队列中读取消息,并存放到已构建消息对象中;
qQueue.get(message, gmo);
// 从读出的消息中解析消息内容;
if (message.getMessageLength() > 0)
{
messageString = message.readString(message.getMessageLength());
System.out.println(messageString);
}

}

public void SendMsg(byte[] qByte) {
try {
MQMessage qMsg = new MQMessage();
qMsg.write(qByte);
MQPutMessageOptions pmo = new MQPutMessageOptions();

qQueue.put(qMsg, pmo);

System.out.println("The message is sent!");
System.out.println("/tThe message is " + new String(qByte, "GBK"));
} catch (MQException e) {
System.out
.println("A WebSphere MQ error occurred : Completion code "
+ e.completionCode + " Reason Code is "
+ e.reasonCode);
} catch (java.io.IOException e) {
System.out
.println("An error occurred whilst to the message buffer "
+ e);
}

}

/** */
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
MQTest mqst = new MQTest();
// mqst.init();
try {
// mqst.SendMsg(mqst.Msg.getBytes("GBK"));
mqst.GetMsg();
} catch (Exception e) {
e.printStackTrace();
}
mqst.finalizer();

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: