您的位置:首页 > 其它

.NET分布式事务处理总结【下】 - 包含MSMQ的分布式事务处理

2011-03-15 16:04 225 查看
.NET直接提供对MSMQ的访问支持,只需要添加System.Messaging程序集引用即可方便地操作MSMQ。MSMQ支持两种事务处理模式:内部事务处理以及基于MS-DTC的分布式事务处理。

MSMQ的内部事务处理

MSMQ的内部事务处理是指,仅采用MSMQ本身提供的事务处理机制完成事务处理。比如,假设有一系列的消息需要发布到MSMQ,那么,就可以启动一个内部事务,确保这些消息的发布过程是一个原子操作。要使用MSMQ的内部事务处理机制,在创建消息队列的时候,就需要勾选“事务性”选项,如下图所示:





首先,需要创建一个MessageQueueTransaction的对象,并使用Begin调用以启动MSMQ的内部事务处理。然后,在MessageQueue的Send方法中,使用Send(object, MessageQueueTransaction)的重载函数发送消息,将创建好的MessageQueueTransaction对象作为第二个参数传递给Send方法;在完成所有消息的发送之后,使用MessageQueueTransaction对象的Commit方法提交事务。如果在发送消息的过程中遇到问题,则使用MessageQueueTransaction对象的Abort调用回滚事务。请参见下面的示例代码:

隐藏行号 复制代码 ? MSMQ的内部事务处理

using (MessageQueue messageQueue = new MessageQueue(@".\private$\TPCDemoQueue",


false, false, QueueAccessMode.SendAndReceive))


{


MessageQueueTransaction trans = new MessageQueueTransaction();


try


    {


trans.Begin();


for (int i = 0; i < 5; i++)


        {


messageQueue.Send(new Message(i), trans);


}


trans.Commit();


}


catch


    {


trans.Abort();


}


messageQueue.Close();


}




// function CopyCode(key){var codeElement=null;var trElements=document.all.tags("ol");var i;for(i=0;i注意:如果你的消息队列在创建的时候没有设置“事务性”选项,那么,在完成消息队列的创建以后,你将无法修改该选项。更糟糕的是,在非事务性队列上执行上面的代码,则无法将消息发布到消息队列上,框架本身也不会提示任何错误信息,指示消息并未发布成功。

在分布式事务处理中使用MSMQ

在分布式事务处理的上下文中(比如,.NET 2.0+的TransactionScope中),上面所提到的MessageQueueTransaction将毫无用处,也就是说,MessageQueueTransaction与分布式事务处理毫无关系。你所要做的是,用正常的方式初始化一个MessageQueue的实例,然后,调用Send方法发布消息,在发布消息的时候,通过设置MessageQueueTransactionType的值来告诉MessageQueue,目前正处于一个分布式事务的上下文中。于是,你需要使用Send/Receive的重载方法:Send(object, MessageQueueTransactionType)以及Receive(MessageQueueTransactionType)。如下:

隐藏行号 复制代码 ? 分布式事务中的MSMQ调用

using (TransactionScope transaction = new TransactionScope())


{


Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);


// do some work


transaction.Complete();


}




// function CopyCode(key){var codeElement=null;var trElements=document.all.tags("ol");var i;for(i=0;i注意:对于一些生命周期相对较长的事务处理,比如,假设你的用例是这样的:你首先需要从一个消息队列中获得消息,然后更新你的数据库记录,那么你的代码可能会是这样的:

隐藏行号 复制代码 ? 分布式事务中的MSMQ调用

using (TransactionScope transaction = new TransactionScope())


{


using (MessageQueue someQueue = new MessageQueue("<queue connection>"))


    {


Message msg = someQueue.Receive();


// do something else


}


transaction.Complete();


}




// function CopyCode(key){var codeElement=null;var trElements=document.all.tags("ol");var i;for(i=0;i这样做其实是不对的!因为Receive方法是一种同步调用,如果消息队列中根本没有任何内容,那么Receive调用就会被阻塞,直到消息队列中出现新的消息。这就意味着你的分布式事务一直都是处于开启的状态,而且可能由于等待时间过长而导致超时,最终导致一个MessageQueueException。

正确的做法是,在MessageQueue上使用BeginPeek调用(注意:不是BeginReceive方法,因为BeginReceive方法并不是用来处理事务性队列的),然后订阅PeekComplete事件,在事件处理过程中,再使用TransactionScope以及Receive等方法实现消息的获取。例如:

隐藏行号 复制代码 ? 分布式事务中的MSMQ调用

MessageQueue inputQueue = new MessageQueue("<queue connection>");


inputQueue.PeekCompleted += (s, e) =>


    {


        using (TransactionScope transaction = new TransactionScope())


        {


        Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);


        // do some work


        transaction.Complete();


}


inputQueue.BeginPeek();


};


inputQueue.BeginPeek();




// function CopyCode(key){var codeElement=null;var trElements=document.all.tags("ol");var i;for(i=0;i最后再提醒一下,就是如果你所要做的事情仅限于与MSMQ打交道,那么只要使用MSMQ的内部事务处理机制就可以了,毕竟使用分布式事务处理会涉及到MS-DTC,从而造成过大的系统开销,影响性能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: