您的位置:首页 > 其它

NET下RabbitMQ实践[WCF发布篇]

2010-10-22 14:46 351 查看
在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。



注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵。

首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:



Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples




当然官方还提供了基本.NET 2.0 版本的示例版本,但其中只是一些简单的示例,并不包括WCF部分,这里只发个链接,感兴趣的朋友可自行研究。



Binary, compiled for .NET 2.0 (zip) - includes example code





下载基于.NET 3.0的版本源码之后,解压其中的projects/examples/wcf目录,可看到如下的项目:








几个文件夹分别对应如下应用场景:

OneWay: 单向通信(无返回值)

TwoWay: 双向通信(请求/响应)

Session:会话方式

Duplex: 双向通信(可以指定一个Callback回调函数)



下面逐一进行介绍:




OneWay



在OneWayTest示例中,演示了插入日志数据,因为日志操作一般只是单纯的写入操作,不考虑返回值,所以使用OneWay方式。



下面是其WCF接口声明和实例代码,如下:





[ServiceContract]


public

interface
ILogServiceContract

{

[OperationContract(IsOneWay
=
true
)]


void
Log(LogData entry);

}



[ServiceBehavior(InstanceContextMode
=
InstanceContextMode.Single)]


public

class
LogService : ILogServiceContract

{


public

int
m_i;


public

void
Log(LogData entry)

{

Util.WriteLine(ConsoleColor.Magenta,
"
[SVC] {3} [{0,-6}] {1, 12}: {2}
"
, entry.Level, entry.TimeStamp, entry.Message, m_i
++
);

}

}





其只包含一个方法:Log(LogData entry) ---用于添加日志记录,可以看出它与我们以往写WCF代码没什么两样。



不过这里要说明一下,在类属性InstanceContextMode枚举类型中,使用了“Single”模式,而该枚举提供了如下三种情况:



Single
- 为所有客户端调用分配一个服务实例。

PerCall
– 为每个客户端调用分配一个服务实例。

PerSession
– 为每个客户端会话分配一个服务实例。每个Session内多线程操作实例的话会有并发问题。



InstanceContextMode 的默认设置为 PerSession



这三个值通常是要与并发模式(ConcurrencyMode)搭配使用,以解决并发效率,共享资源等复杂场景下的问题的。下面是并发模式的说明:

ConcurrencyMode 控制一次允许多少个线程进入服务。ConcurrencyMode 可以设置为以下值之一:



Single
- 一次可以有一个线程进入服务。

Reentrant
- 一次可以有一个线程进入服务,但允许回调。

Multiple
- 一次可以有多个线程进入服务。



ConcurrencyMode 的默认设置为 Single




InstanceContextMode 和 ConcurrencyMode 设置会相互影响,因此为了提升并发效能,必须协调这两项设置。




例如,将 InstanceContextMode 设置为 PerCall 时,会忽略 ConcurrencyMode
设置。这是因为,每个客户端调用都将路由到新的服务实例,因此一次只会有一个线程在服务实例中运行。对于PerCall的实例模型,每个客户端请求都会与
服务端的一个独立的服务实例进行交互,就不会出现多个客户端请求争用一个服务实例的情况,也就不会出现并发冲突,不会影响吞吐量的问题。但对于实例内部的
共享变量(static)还是会可能出现冲突。



但对于当前Single设置,原因很多,可能包括:

1. 创建服务实例需要大量的处理工作。当多个客户端访问服务时,仅允许创建一个服务实例可以降低所需处理量。

2. 可以降低垃圾回收成本,因为不必为每个调用创建和销毁服务创建的对象。

3. 可以在多个客户端之间共享服务实例。

4. 避免对static静态属性的访问冲突。




但如果使用Single,问题也就出来了---就是性能,因为如果
ConcurrencyMode也同时设置成Single时,当前示例中的(唯一)服务实例不会同时处理多个(单线程客户端)请求。因为服务在处理请求时
会对当前服务加锁,如果再有其它请求需要该服务处理的时候,需要排队等候。如果有大量客户端访问,这可能会导致较大的瓶颈。

当然如果考虑到多线程客户端使用的情况,可能问题会更严重。



聊了这些,无非就是要结合具体应用场景来灵活搭配ConcurrencyMode,InstanceContextMode这两个枚举值。



下面言归正传,来看一下如何将该服务与RabbitMQ进行绑定,以实现以WCF方式访问RabbitMQ服务的效果。这里暂且略过LogData数据结构信息类,直接看一下如果绑定服务代码(位于OneWayTest.cs):





private
ServiceHost m_host;



public

void
StartService(Binding binding)

{

m_host
=

new
ServiceHost(
typeof
(LogService),
new
Uri(
"
soap.amqp:///
"
));

((RabbitMQBinding)binding).OneWayOnly
=

true
;

m_host.AddServiceEndpoint(
typeof
(ILogServiceContract), binding,
"
LogService
"
);

m_host.Open();

m_serviceStarted
=

true
;

}





StartService方法的主体与我们平时启动WCF服务的方法差不多,只不过是将其中的URL协议部分换成了“soap.amqp”形式,而其中的
传入参数binding则是RabbitMQBinding类型,该类型是rabbitmq客户端类库提供的用于对应Binding类的
RabbitMQBinding实现。下面就是其类实始化代码:






return

new
RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings[
"
manual-test-broker-uri
"
],RabbitMQ.Client.Protocols.FromConfiguration(
"
manual-test-broker-protocol
"
));



其包括两个参数,一个是rabbitmq服务地址,一个是所用的协议,其对应示例app.config文件中的如下结点:


<
add
key
="manual-test-broker-uri"
value
="amqp://10.0.4.79:5672/"
/>
<!--
本系列第一篇中的环境设置
-->

<
add
key
="manual-test-broker-protocol"
value
="AMQP_0_8"
/>





这样,我们就完成了初始化服务实例工作。



接着来构造客户端代码,如下:





private ChannelFactory
<
ILogServiceContract
>
m_factory;

private ILogServiceContract m_client;



public ILogServiceContract GetClient(Binding binding)

{

((RabbitMQBinding)binding).OneWayOnly = true;

m_factory = new ChannelFactory
<
ILogServiceContract
>
(binding, "soap.amqp:///LogService");

m_factory.Open();

return m_factory.CreateChannel();

}



与平时写的代码相似,但传入参数就是上面提到的那个RabbitMQBinding实例,这样通过下面代码访问WCF中的LOG方法:

m_client = GetClient(Program.GetBinding());

m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));

m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));

....



到这里,我们可以看出,它的实现还是很简单的。我们只要把10.0.4.79:5672上的rabbitmq的环境跑起来,就可以看出最终的效果了。



之后我将C#的服务端(startservice)与客户端(getclient)分开布署到不同IP的主机上,也实现了示例中的结果。






TwoWay



下面介绍一下 TwoWay双向通信示例,首先是WCF接口声明和实现:





[ServiceContract]

public interface ICalculator

{

[OperationContract]

int Add(int x, int y);



[OperationContract]

int Subtract(int x, int y);

}



[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*为每个客户端调用分配一个服务实例*/

public sealed class Calculator : ICalculator

{

public int Add(int x, int y)

{

return x + y;

}

public int Subtract(int x, int y)

{

return x - y;

}

}





因为其服务的启动startservice和客户端实例构造与oneway方法类似,为了节约篇幅,这时就略过了,下面是其最终调用代码(位于TwoWayTest.cs):





public void Run()

{

StartService(Program.GetBinding());

ICalculator calc = GetClient(Program.GetBinding());

int result = 0, x = 3, y = 4;

Util.WriteLine(ConsoleColor.Magenta, " {0} + {1} = {2}", x, y, result = calc.Add(x, y));

if (result != x + y)

throw new Exception("Test Failed");

......

}





与普通的WCF TWOWAY 返回调用方式相同,就不多说了。






Session



下面是基于Session会话方式的代码,WCF接口声明和实现:





[ServiceContract(SessionMode= SessionMode.Required)]

public interface ICart

{

[OperationContract]

void Add(CartItem item);

[OperationContract]

double GetTotal();



Guid Id { [OperationContract]get; }

}



[ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]

public class Cart : ICart

{

public Cart()

{

Items = new List
<
CartItem
>
();

m_id = Guid.NewGuid();

}



private Guid m_id;

private List
<
CartItem
>
m_items;

private List
<
CartItem
>
Items {

get { return m_items; }

set { m_items = value; }

}

public void Add(CartItem item)

{

Items.Add(item);

}



public double GetTotal()

{

double total = 0;

foreach (CartItem i in Items)

total += i.Price;

return total;

}

public Guid Id { get { return m_id; } }

}







该接口实现一个购物车功能,可以添加商品并计算总价,考虑到并发场景,这里将其实例为PerSession枚举类型,即为每个客户端会话分配一个服务实
例。这样就可以在用户点击购买一件商品里,为其购物车商品列表List<CartItem>添加一条信息,而不会与其它用户的购物车商品列表
相冲突。



其最终的调用方法如下:





public void Run()

{

StartService(Program.GetBinding());

ICart cart = GetClient(Program.GetBinding());

AddToCart(cart, "Beans", 0.49);//添加商品到购物车

AddToCart(cart, "Bread", 0.89);

AddToCart(cart, "Toaster", 4.99);

double total = cart.GetTotal();//计算总价

if (total != (0.49 + 0.89 + 4.99))

throw new Exception("Incorrect Total");

......

}








Duplex



最后,再介绍一下如何基于Duplex双向通信模式进行开发,DuplexTest这是个“PIZZA订单”的场景,用户下单之后,等待服务端将PIZZA加工完毕,然后服务端用callback方法通知客户端PIZZA已做好,相应WCF接口声明和实现如下:





[ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*绑定回调接口*/

public interface IPizzaService

{

[OperationContract(IsOneWay=true)]

void PlaceOrder(Order order);

}

[ServiceContract]

public interface IPizzaCallback

{

[OperationContract(IsOneWay=true)]

void OrderReady(Guid id); /*用于通知客户端*/

}



public class PizzaService : IPizzaService

{

public void PlaceOrder(Order order)

{

foreach (Pizza p in order.Items)

{

Util.WriteLine(ConsoleColor.Magenta, " [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);

}

Util.WriteLine(ConsoleColor.Magenta, " [SVC] Order {0} is Ready!", order.Id);

Callback.OrderReady(order.Id);

}

IPizzaCallback Callback

{

get { return OperationContext.Current.GetCallbackChannel
<
IPizzaCallback
>
(); } //当前上下文中调用客户端绑定的回调方法

}

}







这里要说明的是IPizzaCallback接口的OrderReady方法被绑定了IsOneWay=true属性,主要是因为如果使用“请求-响应”
模式,客户端必须等服务端“响应”完成上一次“请求”后才能发出下一步“请求”。因此虽然客户端可以使用多线程方式来调用服务,但最后的执行结果仍然表现
出顺序处理(效率低)。要想使服务端能够并行处理客户端请求的话,那我们就不能使用“请求-响应”的调用模式,所以这里使用One-Way的方式来调用服
务。



下面是客户端回调接口实现:





public class PizzaClient : DuplexClientBase
<
IPizzaService
>
, IPizzaService

{

public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)

: base(context, binding, remoteAddress) { }

public void PlaceOrder(Order order)

{

Channel.PlaceOrder(order);

}

}





最终客户端实例化(startservice)略过,因与之前示例类似。





public IPizzaService GetClient(Binding binding)

{

PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));

client.Open();

return client;

}








上面的方法中将当前客户端实例this(实现了IServiceTest<IPizzaService>,
IPizzaCallback接口)注册到上下文中,目的是为了将其方法的回传调用传递到服务端(还记得服务端的这行代码吗?=>
Callback.OrderReady(order.Id))

public void OrderReady(Guid id)

{

Util.WriteLine(ConsoleColor.Magenta, " [CLI] Order {0} has been delivered.",id);

mre.Set();

}



这样,服务端完成pizza时,就可以调用客户端的OrderReady方法来实现通知功能了。



下面就是一个整个的下单流程实现:





public void Run()

{

......

StartService(Program.GetBinding());

IPizzaService client = GetClient(Program.GetBinding());

Order lunch = new Order();

lunch.Items = new List
<
Pizza
>
();

lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));

client.PlaceOrder(lunch);

......

}





好了,今天的主要内容就先到这里了,在接下来的文章中,将会介绍一个rabbitmq的实际应用场景,也是我们Discuz!NT企业版中的一个功能:记录系统运行的错误日志





敬请观注!



原文链接:http://www.cnblogs.com/daizhj/archive/2010/10/22/1858284.html

Tags:Rabbitmq,NET

BLOG: http://daizhj.cnblogs.com/
作者:daizhj,代震军



相关WCF参考文档:

WCF开始使用6--并发1


WCF实例上下文模式与并发模式对性能的影响
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: