您的位置:首页 > 其它

消息队列(MSMQ)实现多服务器应用程序之间消息实时交互

2009-05-25 09:53 691 查看
我所介绍的例子是利用微软的消息队列(msmq)实现多个服务器之间消息实时传递。

应用程序:基于dotnet平台采用WinForm+Webservice开发的应用程序。

每个地区都有自己的数据库和Webservice服务器。

Webservice服务器有很多台,这样多个服务器上用户互相交流就成了问题。思前想后采用了msmq,设计思想如下:

例子:A服务器用户user1发送消息给B服务器上的user2

1、首先是数据库结构是一样的使用sql2005同步用户信息表。

2、在每台Webservice服务器上安装msmq。

3、当user1发送消息给user2时,判断user2所在服务器,如果user2和发送者不在同一个服务器就使用msmq传递消息给B服务器。

4、每个Webservice服务器都有一个消息接收服务程序,用来侦测本服务器消息队列里的消息,反序列化消息内容,写入数据库。

5、user2直接读取数据库就可以了。

6、实际上就是通过msmq在多个数据库服务器之间消息传递。

下面是消息接收服务程序的源代码:

Code

using System;

using System.Collections;

using System.ComponentModel;

using System.Data;

using System.Diagnostics;

using System.ServiceProcess;

using System.Threading;

using System.Messaging;

using wzh.Db;//数据库操作类

///<summary>

///名称:消息服务程序

///用途:实时侦测远程机器发送到本机的消息队列的消息,写入本机数据库。

///作者:wzh

///编写日期:2008-9-28

///修改日期:wzh于2008-10-10修改了部分BUG

///</summary>

namespace wzh_MsgService

{

public class MsgService : System.ServiceProcess.ServiceBase

{

/// <summary>

/// 必需的设计器变量。

/// </summary>

private System.ComponentModel.Container components = null;

private bool servicePaused;

private string mPath = null;// = @".\Private$\MsgQueue

public MsgService()

{

// 该调用是 Windows.Forms 组件设计器所必需的。

InitializeComponent();

// TODO: 在 InitComponent 调用后添加任何初始化

}

// 进程的主入口点

static void Main()

{

System.ServiceProcess.ServiceBase[] ServicesToRun;

// 同一进程中可以运行多个用户服务。若要将

//另一个服务添加到此进程,请更改下行

// 以创建另一个服务对象。例如,

//

// ServicesToRun = New System.ServiceProcess.ServiceBase[] {new Service1(), new MySecondUserService()};

//

ServicesToRun = new System.ServiceProcess.ServiceBase[] { new MsgService() };

System.ServiceProcess.ServiceBase.Run(ServicesToRun);

}

/// <summary>

/// 调试用在项目属性里,将输出类型(Output Type)改成Console Application,将Startup Object改成Namespace.ServiceName

/// </summary>

/// <param name="args"></param>

// public static void Main(string[] args)

// {

// MsgService s = new MsgService();

// s.OnStart(args);

// Console.ReadLine();

// s.OnStop();

// }

/// <summary>

/// 设计器支持所需的方法 - 不要使用代码编辑器

/// 修改此方法的内容。

/// </summary>

private void InitializeComponent()

{

components = new System.ComponentModel.Container();

this.ServiceName = "QZCPNET_MsgService";

this.CanPauseAndContinue = true;

this.CanStop = true;

}

/// <summary>

/// 清理所有正在使用的资源。

/// </summary>

protected override void Dispose( bool disposing )

{

if( disposing )

{

if (components != null)

{

components.Dispose();

}

}

base.Dispose( disposing );

}

/// <summary>

/// 设置具体的操作,以便服务可以执行它的工作。

/// </summary>

protected override void OnStart(string[] args)

{

// TODO: 在此处添加代码以启动服务。

servicePaused=false;

mPath = GetXmlData("//msgmsmqpath");

Thread newThread=new Thread(new ThreadStart(RecevieMessage));

newThread.IsBackground=true;

newThread.Start();

}

/// <summary>

/// 接收本地服务器上的MSMQ里的消息

/// </summary>

private void RecevieMessage()

{

if(!servicePaused)

{

if(! MessageQueue.Exists(mPath))

{

MessageQueue.Create(mPath);

MessageQueue mqTemp = new MessageQueue(mPath);

mqTemp.SetPermissions("Everyone", MessageQueueAccessRights.FullControl);

}

MessageQueue MyQueue = new MessageQueue(mPath);

MyQueue .Formatter =new XmlMessageFormatter (new Type []{typeof (DataSet)});

MyQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MyReceiveCompleted);

MyQueue.BeginReceive();

}

}

private void MyReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)

{

MessageQueue mq = (MessageQueue)source;

try

{

Message ms = mq.EndReceive(asyncResult.AsyncResult);

DataSet ds = (DataSet)ms.Body;

ProcessMsg(ds);

}

catch(Exception ex)

{

if(ex.Source != "System.Messaging")

WriteSysLog(ex.ToString());

}

finally

{

GC.Collect();

mq.BeginReceive();

}

return;

}

/// <summary>

/// 消息处理

/// </summary>

/// <param name="ds"></param>

private void ProcessMsg(DataSet ds)

{

//插入新消息

try

{

clsDBExec clsDb=new clsDBExec(GetXmlData("//conn") ,"消息管理-SaveMsg");

string strSql="";

DataRow dr=ds.Tables[0].Rows[0];

strSql="INSERT INTO SM_MsgessSend ( MsgSId, MsgTitle, MsgSender,MsgSenderJgCode, MsgSenderJg, MsgAderrs,MsgSAderrs ,MsgContent, "+

" MsgAccessName, MsgAccessDes, MsgHz)"+

" VALUES ("+dr["MsgSId"].ToString()+",'"+ dr["MsgTitle"].ToString()+"','"+ dr["MsgSender"].ToString() +"','"+ dr["MsgSenderJgCode"].ToString() + "','" + dr["MsgSenderJg"].ToString() +"','"+ dr["MsgAderrs"].ToString() +"','"+

dr["MsgSAderrs"].ToString() +"','"+dr["MsgContent"].ToString().Replace("'","''") +"','"+ dr["MsgAccessName"].ToString().Replace("'","''") +"','"+ dr["MsgAccessDes"].ToString() +"','"+dr["MsgHz"].ToString() + "');";

strSql+="SELECT @@IDENTITY ";

string strV=clsDb.getOneVal(strSql);

//发入发件箱

strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder,MsgIsRead ) values('"+ dr["MsgSender"].ToString() +"',"+ strV +",'发件箱',2)";

clsDb.execNonQuery(strSql);

//插入到用户

string strAderrs=dr["MsgSAderrs"].ToString();

string [] arrStr=strAderrs.Split(',');

strSql="";

foreach(string strUser in arrStr)

{

if (strUser.ToUpper()=="ALL" )

{

strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT UserName,"+ strV +",'收件箱' FROM SM_UserId";

break;

}

if (strUser.ToUpper()=="ALLONLINE" )

{

strSql="INSERT INTO SM_MsgessUser(UserId, MsgId,MsgFolder ) SELECT DISTINCT UserId,"+ strV +",'收件箱' FROM SM_OnlineUser";

break;

}

if ((strUser.ToUpper()!="ALL" ) && strUser.ToUpper()!="ALLONLINE")

{

strSql+="INSERT INTO SM_MsgessUser(UserId, MsgId ,MsgFolder) SELECT UserName,"+ strV +",'收件箱' FROM SM_UserId where UserName='"+ strUser +"';";

}

}

clsDb.execNonQuery(strSql);

}

catch(Exception ex)

{

WriteSysLog(ex.Message);

}

}

/// <summary>

/// 停止此服务。

/// </summary>

protected override void OnStop()

{

// TODO: 在此处添加代码以执行停止服务所需的关闭操作。

servicePaused=true;

}

/// <summary>

/// 获取XML文件内容

/// </summary>

/// <param name="nodepath"></param>

/// <returns></returns>

private string GetXmlData(string nodepath)

{

System.Xml.XmlDocument dom=new System.Xml.XmlDocument();

dom.Load(@"e:\net\service\Serverconfig.xml");

return dom.SelectSingleNode(nodepath).InnerText;

}

/// <summary>

/// 写日志

/// </summary>

/// <param name="strMsg"></param>

private void WriteSysLog(string strMsg)

{

EventLog objLog=new EventLog("Application") ;

objLog.Source ="信息网络管理系统--消息服务";

objLog.WriteEntry(strMsg,EventLogEntryType.Error );

}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: