您的位置:首页 > 其它

支持异步同步的分布式CommandBus MSMQ实现

2013-09-05 10:07 183 查看

支持异步同步的分布式CommandBus MSMQ实现

先上一张本文所描述的适用场景图



分布式场景,共3台server:

前端Server

Order App Server

Warehouse App Server

功能:

前端Server可以不停的发送Command到CommandBus,然后由CommandBus分配不同的Command到各自的app server去处理。

前端Server可以只发送Command而不必等待Response

前端Server可以同步等待Response返回

MSMQ消息超过3.5M会自动转为网络共享方式传输消息

对于同一Command的处理,可以通过增加App Server的方式来提高并发处理速度(比如:可以开2个app server instance来同时处理ACommand的处理)

本文目标是用msmq实现分布式CommandBus的应用(已经更新到A2D Framework中了)。

前端Server发送Command的方式(异步):

ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult>(cmdDistributer_ResultCatached);
cmdDistributer.SendRequest(cmd);


同步方式:

ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.SendRequest(cmd);
ACommandResult result=cmdDistributer.WaitResponse();


配置文件:

<?xml version="1.0" encoding="utf-8" ?>
<CommandBusSetting>
<AutoCreateIfNotExists>true</AutoCreateIfNotExists>
<CommandQueue>PC-20130606HCVP\private$\Commands_{0}</CommandQueue>
<ResponseQueue>PC-20130606HCVP\private$\CommandResponses</ResponseQueue>
<NetworkLocation>\\PC-20130606HCVP\network</NetworkLocation>
</CommandBusSetting>


Command的编写方式:

[QueueName("ACommand")]//这个可选,没有QueueName时,默认对应的msmq队列名为类名,此处为ACommand
public class ACommand : BaseCommand  //需要继承自BaseCommand
{
public string Tag { get; set; }//自定义的属性
}


后端App Server的编写



CommandHandlerHost1: Console程序,相当于App Server 1,会处理部分Command

static void Main(string[] args)
{
Thread.Sleep(2000);

CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new TestCommandHandlers());
listener.AddHandler(new Test2CommandHandlers());
listener.Start();
Console.ReadKey();
}


CommandHandlerHost2: Console程序,相当于App Server 2,会处理部分Command

static void Main(string[] args)
{
Thread.Sleep(2000);

CommandHandlerListener listener = new CommandHandlerListener();
listener.AddHandler(new Test3CommandHandlers());
listener.Start();
Console.ReadKey();
}


CommandHandlers: 所有的Command处理函数都会在这个项目中实现

public class TestCommandHandlers : ICommandHandlers,
ICommandHandler<ACommand, ACommandResult>,
ICommandHandler<BCommand, BCommandResult>
{
public ACommandResult Handler(ACommand cmd)
{
Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag);

ACommandResult result = new ACommandResult();
result.Result = "result from ACommand";
return result;
}

public BCommandResult Handler(BCommand cmd)
{
Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag);

BCommandResult result = new BCommandResult();
result.Result = "result from BCommand";
return result;
}
}


下面是目前的性能测试:



下载代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: