您的位置:首页 > 编程语言 > C#

c#简单使用IBM-MO编程

2017-08-07 16:10 204 查看
c#简单使用IBM-MO编程

1:TOPIC使用

region topic

#region 定义属性/字段
public MQQueueManager qMgr;
public MqMsg msg = null;
public bool isflag = true;
MQueueHelper MQueueHelpr = null;
/// <summary>
/// 定义委托
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public delegate void UserRequest(string msg, string topicTitle);
/// <summary>
/// 此委托类型的事件
/// </summary>
public event UserRequest OnUserRequest;
string Qqueuemanger = "";
bool flag = false;
string Hostname;
string Channel;
int Port;
string Uername;
string Psd;
string Queuemanger = "";
#endregion

private List<string> namelist = new List<string>();

#region 初始化队列管理器

/// <summary>
/// 初始化MQTT
/// </summary>
/// <param name="hostname">IP</param>
/// <param name="channel">通道</param>
/// <param name="port">端口</param>
/// <param name="username">用户名</param>
/// <param name="psd">密码</param>
/// <param name="queuemanger">管理器</param>
/// <returns>是否连接成功</returns>
public bool InitMq(string hostname, string channel, int port, string username, string psd, string queuemanger)
{
bool isflag = false;
//msg = new MqMsg(new MQueueHelper());
if (InitMQEnvironment(hostname, channel, port, username, psd, queuemanger))
{
Qqueuemanger = queuemanger;
isflag = true;
}
return isflag;
}

/// <summary>
/// 初始化队列管理器
/// </summary>
/// <param name="hostname"></param>
/// <param name="channel"></param>
/// <param name="port"></param>
/// <param name="username"></param>
/// <param name="psd"></param>
/// <param name="queuemanger"></param>
/// <returns></returns>
private bool InitMQEnvironment(string hostname, string channel, int port, string username, string psd, string queuemanger)
{
bool isflag = false;
try
{
//从配置文件中读取值
MQEnvironment.Hostname = hostname;
MQEnvironment.Channel = channel;
MQEnvironment.Port = port;
MQEnvironment.UserId = username;
MQEnvironment.Password = psd;

Hostname = hostname;
Channel = channel;
Port = port;
Uername = username;
Psd = psd;
Queuemanger = queuemanger;
//queueName = QueueName;
if (qMgr == null)
{
qMgr = new MQQueueManager(queuemanger);        //连接队列管理器
MqHelper.saveMQQueueManager = qMgr;
}
else
{
isflag = false;
}
isflag = true;
}
catch (MQException exp)
{
Log.Logger.WriteErrorLog(exp.Message, "PushServiceManagement", "InitMQEnvironment", "", "推送消息", exp);
isflag = false;
}
return isflag;
}
#endregion
#region topic

#region 定义属性/字段
public MQQueueManager qMgr;
public MqMsg msg = null;
public bool isflag = true;
MQueueHelper MQueueHelpr = null;
/// <summary>
/// 定义委托
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public delegate void UserRequest(string msg, string topicTitle);
/// <summary>
/// 此委托类型的事件
/// </summary>
public event UserRequest OnUserRequest;
string Qqueuemanger = "";
bool flag = false;
string Hostname;
string Channel;
int Port;
string Uername;
string Psd;
string Queuemanger = "";
#endregion

private List<string> namelist = new List<string>();

#region 初始化队列管理器

/// <summary>
/// 初始化MQTT
/// </summary>
/// <param name="hostname">IP</param>
/// <param name="channel">通道</param>
/// <param name="port">端口</param>
/// <param name="username">用户名</param>
/// <param name="psd">密码</param>
/// <param name="queuemanger">管理器</param>
/// <returns>是否连接成功</returns>
public bool InitMq(string hostname, string channel, int port, string username, string psd, string queuemanger)
{
bool isflag = false;
//msg = new MqMsg(new MQueueHelper());
if (InitMQEnvironment(hostname, channel, port, username, psd, queuemanger))
{
Qqueuemanger = queuemanger;
isflag = true;
}
return isflag;
}

/// <summary>
/// 初始化队列管理器
/// </summary>
/// <param name="hostname"></param>
/// <param name="channel"></param>
/// <param name="port"></param>
/// <param name="username"></param>
/// <param name="psd"></param>
/// <param name="queuemanger"></param>
/// <returns></returns>
private bool InitMQEnvironment(string hostname, string channel, int port, string username, string psd, string queuemanger)
{
bool isflag = false;
try
{
//从配置文件中读取值
MQEnvironment.Hostname = hostname;
MQEnvironment.Channel = channel;
MQEnvironment.Port = port;
MQEnvironment.UserId = username;
MQEnvironment.Password = psd;

Hostname = hostname;
Channel = channel;
Port = port;
Uername = username;
Psd = psd;
Queuemanger = queuemanger;
//queueName = QueueName;
if (qMgr == null)
{
qMgr = new MQQueueManager(queuemanger);        //连接队列管理器
MqHelper.saveMQQueueManager = qMgr;
}
else
{
isflag = false;
}
isflag = true;
}
catch (MQException exp)
{
Log.Logger.WriteErrorLog(exp.Message, "PushServiceManagement", "InitMQEnvironment", "", "推送消息", exp);
isflag = false;
}
return isflag;
}
#endregion


#region 初始化TOPIC

/// <summary>
///         /// </summary>
/// <param name="subname">客户端名称</param>
public void InitClientTopic(string subname, string qmrsub)
{
MqHelper.SetTopicDics(subname);
msg = new MqMsg(MQueueHelpr);
GetQueueMsg(subname.Replace(":", ""), "flight");
GetQueueMsg(subname.Replace(":", ""), "stand");
}

/// <summary>
///        /// </summary>
/// <param name="subname">客户端名称</param>
public void InitClientGateTopic(string subname, string qmrsub)
{
MqHelper.SetGateTopicDics(subname);
msg = new MqMsg(MQueueHelpr);
GetQueueMsg(subname.Replace(":", ""), "gate");
}

/// <summary>
/// 创建TOPIC
/// </summary>
/// <param name="topicname"></param>
/// <param name="subname"></param>
/// <returns></returns>
public MQTopic CreateTopic(string topicname, string subname)
{
MQTopic topic = null;
int openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_RESUME | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_MANAGED | MQC.MQSO_NON_DURABLE;
string topicName = topicname;
string topicObject = "";
string subName = subname;
MQDestination unmanagedDest = null;
int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;

if (qMgr == null)
{
qMgr = new MQQueueManager();
//MqHelper.saveMQQueueManager = qMgr;
}
bool managed = false;
bool durable = true;
if (managed && durable) //托管持久
{
openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_RESUME | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_MANAGED | MQC.MQSO_DURABLE;
topic = qMgr.AccessTopic(topicName, topicObject, openOptionsForGet, null, subName);
}
else if (managed && !durable) //托管非持久
{
openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_MANAGED | MQC.MQSO_NON_DURABLE;
topic = qMgr.AccessTopic(topicName, topicObject, MQC.MQTOPIC_OPEN_AS_SUBSCRIPTION, openOptionsForGet);
}
else if (!managed && durable) //非托管持久
{
openOptionsForGet = MQC.MQSO_CREATE | MQC.MQSO_FAIL_IF_QUIESCING | MQC.MQSO_DURABLE | MQC.MQSO_RESUME;
unmanagedDest = qMgr.AccessQueue("Q" + subname.Replace(":", ""), openOptions);
topic = qMgr.AccessTopic(unmanagedDest, topicName, topicObject, openOptionsForGet, null, subName);
topic.SubscriptionReference.CloseOptions = MQC.MQCO_REMOVE_SUB;
}
if (!MqHelper.topiclist.Contains(topic))
{
MqHelper.topiclist.Add(topic);
}
return topic;
}
#endregion

#region 关闭队列管理器
public void CloseMQEnvironment()
{
flag = false;
foreach (MQTopic item in MqHelper.topiclist)
{
if (item != null)
{
item.Close();
}
}
if (qMgr != null)
{
qMgr.Disconnect();
qMgr.Close();
qMgr = null;
}
}
#endregion


2:队列使用

#region

MQQueueManager qMgr = null;

string IP = “”;

int PORT = 0;

string SSICD = “”;

string QueueName = “”;

string QM = “”;

Hashtable mqConfigs = new Hashtable();

PCFMessageAgent agent = null;

//public MqMsg msg = null;

#endregion

public MQueueHelper()

{

}

public MQueueHelper(string ip, int port, string channel, string ssicd, string qm, string queuename, MQQueueManager qMggr)

{

qMgr = qMggr;

IP = ip;

PORT = port; ;

SSICD = ssicd;

QM = qm;

QueueName = queuename;

mqConfigs.Add(MQC.HOST_NAME_PROPERTY, ip);//ip

mqConfigs.Add(MQC.CHANNEL_PROPERTY, channel);//通道

mqConfigs.Add(MQC.CCSID_PROPERTY, int.Parse(ssicd));//端口号

mqConfigs.Add(MQC.PORT_PROPERTY, port);

}

public MQQueueManager initQManager()
{
//创建队列管理器
qMgr = new MQQueueManager(QM, mqConfigs);
return qMgr;
}

public PCFMessageAgent initAgent()
{
qMgr = initQManager();
agent = new PCFMessageAgent(qMgr);
return agent;
}

/// <summary>
/// 创建本地队列
/// </summary>
/// <param name="subSysConf"></param>
public void CreateLocalQueue(string queueName)
{
PCFMessage[] messages = null;
try
{
agent = initAgent();
//设置队列属性
int sendFlag = MQC.MQQA_GET_ALLOWED;
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CREATE_Q);
request.AddParameter(MQC.MQCA_Q_NAME, queueName);
request.AddParameter(MQC.MQIA_Q_TYPE, MQC.MQQT_LOCAL);
request.AddParameter(MQC.MQIA_DEF_PERSISTENCE, MQC.MQPER_PERSISTENT);
request.AddParameter(MQC.MQIA_INHIBIT_PUT, MQC.MQQA_PUT_ALLOWED);
request.AddParameter(MQC.MQIA_INHIBIT_GET, sendFlag);
request.AddParameter(MQC.MQIA_MAX_Q_DEPTH, 9999999);
request.AddParameter(MQC.MQIA_MAX_MSG_LENGTH, 9999999);
//request.AddParameter(MQC.MQIA_USAGE, MQC.MQUS_TRANSMISSION);
messages = agent.Send(request);
//msg = new MqMsg(this);
}
catch (MQException exp)
{
if (exp.Reason == 4001)
{
ClearQueue("local", queueName);
Log.Logger.WriteErrorLog("本地队列:" + QueueName + "已存在;\n:", "MQueueHelper", "createLocalQueue", "", "推送消息", exp);
}
if (exp.Reason != 0)
{
agent.Disconnect();
Log.Logger.WriteErrorLog("创建本地队列:" + QueueName + "失败;\n:", "MQueueHelper", "createLocalQueue", "", "推送消息", exp);
}
}
catch (Exception exp)
{
Log.Logger.WriteErrorLog("本地队列:有错误信息;\n:" + exp.Message, "MQueueHelper", "createLocalQueue", "", "推送消息", exp);
}
agent.Disconnect();
}

/// <summary>
/// 删除队列
/// </summary>
/// <param name="type">队列类型</param>
/// <param name="qName">队列名称</param>
public void DeleteQ(String type, String qName)
{
agent = initAgent();
int qType = MQC.MQQT_LOCAL;
if ("local".Equals(type))
{
qType = MQC.MQQT_LOCAL;
}
else if ("alias".Equals(type))
{
qType = MQC.MQQT_ALIAS;
}
else if ("remote".Equals(type))
{
qType = MQC.MQQT_REMOTE;
}
PCFMessage[] messages = null;
try
{
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_DELETE_Q);
request.AddParameter(MQC.MQCA_Q_NAME, qName);
request.AddParameter(MQC.MQIA_Q_TYPE, qType);
messages = agent.Send(request);
}
catch (MQException exp)
{
if (exp.Reason != 0)
{
agent.Disconnect();
Log.Logger.WriteErrorLog("删除队列失败" + qName + ",PCF error:\n", "MQueueHelper", "deleteQ", "", "推送消息", exp);
}
}
catch (Exception exp)
{
Log.Logger.WriteErrorLog("本地队列:有错误信息;\n:" + exp.Message, "MQueueHelper", "deleteQ", "", "推送消息", exp);
}

agent.Disconnect();
}

/// <summary>
/// 清楚队列消息
/// </summary>
/// <param name="type"></param>
/// <param name="qName"></param>
public void ClearQueue(String type, String qName)
{
agent = initAgent();
int qType = MQC.MQQT_LOCAL;
if ("local".Equals(type))
{
qType = MQC.MQQT_LOCAL;
}
else if ("alias".Equals(type))
{
qType = MQC.MQQT_ALIAS;
}
else if ("remote".Equals(type))
{
qType = MQC.MQQT_REMOTE;
}
PCFMessage[] messages = null;
try
{
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_CLEAR_Q);
request.AddParameter(MQC.MQCA_Q_NAME, qName);
messages = agent.Send(request);
}
catch (MQException exp)
{
if (exp.Reason != 0)
{
agent.Disconnect();
Log.Logger.WriteErrorLog("清除队列消息失败" + qName + ",PCF error:\n", "MQueueHelper", "ClearQueue", "", "推送消息", exp);
}
}
catch (Exception exp)
{
Log.Logger.WriteErrorLog("清除本地队列:有错误信息;\n:" + exp.Message, "MQueueHelper", "ClearQueue", "", "推送消息", exp);
}
agent.Disconnect();
}

/// <summary>
/// 定义委托
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public delegate void UserQueueRequestHander(string msg, string topicTitle);
/// <summary>
/// 此委托类型的事件
/// </summary>
public event UserQueueRequestHander UserQueueRequest;
//<summary>
//接收队列消息
//</summary>
public void receiveMsg(string queueName, string topicTitle)
{
MQQueue queue = null;
if (MqHelper.saveMQQueue.Count > 1)
{
if (topicTitle == "flight")
{
queue = MqHelper.saveMQQueue[0];
}
else
{
queue = MqHelper.saveMQQueue[1];
}
}
else
{
queue = MqHelper.saveMQQueue[0];
}
int depth = 0;
MQTopic topic = null;
if (MqHelper.saveMQQueueManager == null)
{
//创建qm

}
else
{
if (topicTitle.Trim().ToUpper() == "GATE")
{
topic = MqHelper.topiclist[0];
}
else if (topicTitle.Trim().ToUpper() == "STAND")
{
topic = MqHelper.topiclist[1];
}
if (topic != null)
{
if (topic.IsOpen)
{
if (queue.IsOpen)
{
try
{
depth = queue.CurrentDepth;
}
catch (MQException exp)
{
Log.Logger.WriteErrorLog(exp.Message, "", "", "", "", exp);
if (exp.ReasonCode == 2018)
{
queue = qMgr.AccessQueue(queueName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING + MQC.MQOO_INQUIRE + MQC.MQOO_BROWSE);
depth = queue.CurrentDepth;
}
}
}
else
{
//如果队列没有打开
}
}
else
{
//如果TOPIC没有打开
}
}
else
{
//创建topic
}

}

//将队列的里的消息读出来
while (depth-- > 0)
{
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT;
gmo.WaitInterval = 1;
gmo.MatchOptions = MQC.MQMO_NONE;
MQMessage message = new MQMessage();
queue.Get(message);
string msg = message.ReadString(message.MessageLength);
if (UserQueueRequest != null)
{
UserQueueRequest(msg, topicTitle);
}
}
}

/// <summary>
/// 获取当前管理器所有队列
/// </summary>
/// <param name="qmName">队列管理器</param>
/// <param name="isFindSystemQueue">是否包含系统队列</param>
/// <returns></returns>
public List<string> GetALLQueue(string qmName, bool isFindSystemQueue = false)
{
PCFMessageAgent agent = new PCFMessageAgent(qmName);
PCFMessage request = new PCFMessage(CMQCFC.MQCMD_INQUIRE_Q_NAMES);
request.AddParameter(MQC.MQCA_Q_NAME, "*");
PCFMessage[] response = agent.Send(request);
string[] names = response[0].GetStringListParameterValue(CMQCFC.MQCACF_Q_NAMES);
List<string> result = null;
if (!isFindSystemQueue)
result = names.ToList().Where(s => !s.Contains("AMQ.") && !s.Contains("SYSTEM.")).ToList();
else
result = names.ToList();
return result;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  c# IBM-MO