MassTransit 学习记录(一) 初探
2016-03-24 18:11
316 查看
MassTransit 是Net下一个开源给予消息队列的ESB,其官方网址为 http://masstransit-project.com/,你可以在上面找到相关的源代码下载地址,nuget链接地址,以及开发文档。
本文采用的是当前最新的版本:3.2.4,如果你发现本文例子与官网不符,代表开发者已经修改了相关设计,所以此时以官方为准。
本文例子基于RabbitMQ,但本文的重点是MassTransit的使用入门,所以RabbitMQ相关的内容需自行查阅。
本文从四个方面讲解MassTransit
1、契约
数据契约是消息传递的基础,在MassTransit中,与WCF的数据契约不同,MassTransit的契约官方建议定义为interface,当然你定义成class也不会错,所以这里我们分别定义接口和类两种契约
2、Bus
在这个版本里面,Bus都是统一通过Factory扩展来创建实际的IBusControl,简单的创建代码如下,注意这里并没有创建接收消息的代码
3、Receive
MassTransit支持多种方式来进行消息的接收
a) Consumer,该方式需要实现接口IConsumer<T>
b) Observer,该方式需实现接口IObserver<ConsumeContext<T>>
4、Send / Publish
发送消息分全部发送(不指定EndPoint)和指定Endpoint发送
a) 全部发送(不指定EndPoint)
PS:因为消息是成对的,当你Send或者Publish时,指定的契约必须与接收方一致,否则接收方不会接收到消息,比如可以将上面方法中的rbBus.Publish<IMessageContract>改为rbBus.Publish<MessageContract>,这时候接收端会因为契约不一致而收不到消息
下面是除契约外的完整代码,注意这里用的是控制台程序做demo
本文采用的是当前最新的版本:3.2.4,如果你发现本文例子与官网不符,代表开发者已经修改了相关设计,所以此时以官方为准。
本文例子基于RabbitMQ,但本文的重点是MassTransit的使用入门,所以RabbitMQ相关的内容需自行查阅。
本文从四个方面讲解MassTransit
1、契约
数据契约是消息传递的基础,在MassTransit中,与WCF的数据契约不同,MassTransit的契约官方建议定义为interface,当然你定义成class也不会错,所以这里我们分别定义接口和类两种契约
public interface IMessageContract { string Value { get; set; } } public class MessageContract { public string Value { get; set; } }
2、Bus
在这个版本里面,Bus都是统一通过Factory扩展来创建实际的IBusControl,简单的创建代码如下,注意这里并没有创建接收消息的代码
var rbBus = Bus.Factory.CreateUsingRabbitMq(configure => { var host = configure.Host(new Uri("rabbitmq://192.168.5.136/"), h => { h.Username("guest"); h.Password("guest"); }); }在创建完IBusControl之后,就可以通过Start方法来进行启动
3、Receive
MassTransit支持多种方式来进行消息的接收
a) Consumer,该方式需要实现接口IConsumer<T>
public class MessageContractConsumer : IConsumer<IMessageContract> { public Task Consume(ConsumeContext<IMessageContract> context) { return Task.Run(() => { Console.WriteLine($"Recevied By Consumer:{context.Message.Value}"); }); } }然后你就可以在CreateUsingRabbitMq方法里面通过IRabbitMqBusFactoryConfigurator的ReceiveEndpoint来从指定的消息队列获取消息,下面分别通过Consumer方法和Instance方法来实现消息接收
configure.ReceiveEndpoint(host, "consumer", e => { e.Consumer(() => new MessageContractConsumer()); }); //var consumer = new MessageContractConsumer(); //configure.ReceiveEndpoint(host, "consumer", e => //{ // e.Instance(consumer); //});
b) Observer,该方式需实现接口IObserver<ConsumeContext<T>>
public class MessageContractObserver : IObserver<ConsumeContext<IMessageContract>> { public void OnCompleted() { throw new NotImplementedException(); } public void OnError(Exception error) { throw new NotImplementedException(); } public void OnNext(ConsumeContext<IMessageContract> value) { Console.WriteLine("Received By Observer:{0}", value.Message.Value); } }然后通过Observer来进行注册
configure.ReceiveEndpoint(host, "observer", e => { e.Observer(new MessageContractObserver()); });b) Handler,该方法是最简单的方式
configure.ReceiveEndpoint(host, "handler", e => { e.Handler<IMessageContract>(async context => { await Task.Run(() => { Console.WriteLine("Received By Handler:{0}", context.Message.Value); }); }); });
4、Send / Publish
发送消息分全部发送(不指定EndPoint)和指定Endpoint发送
a) 全部发送(不指定EndPoint)
rbBus.Publish<IMessageContract>(new { Value = text });b)指定EndPoint发送,这是只有指定的ReceiveEndpoint才能接收到消息
var point = rbBus.GetSendEndpoint(new Uri("rabbitmq://192.168.5.136/handler")).Result; point.Send<IMessageContract>(new MessageContract { Value = $"Send To handler { text}" });
PS:因为消息是成对的,当你Send或者Publish时,指定的契约必须与接收方一致,否则接收方不会接收到消息,比如可以将上面方法中的rbBus.Publish<IMessageContract>改为rbBus.Publish<MessageContract>,这时候接收端会因为契约不一致而收不到消息
下面是除契约外的完整代码,注意这里用的是控制台程序做demo
var rbBus = Bus.Factory.CreateUsingRabbitMq(configure => { var host = configure.Host(new Uri("rabbitmq://192.168.5.136/"), h => { h.Username("guest"); h.Password("guest"); }); configure.ReceiveEndpoint(host, "consumer", e => { e.Consumer(() => new MessageContractConsumer()); }); //var consumer = new MessageContractConsumer(); //configure.ReceiveEndpoint(host, "consumer", e => //{ // e.Instance(consumer); //}); configure.ReceiveEndpoint(host, "observer", e => { e.Observer(new MessageContractObserver()); }); configure.ReceiveEndpoint(host, "handler", e => { e.Handler<IMessageContract>(async context => { await Task.Run(() => { Console.WriteLine("Received By Handler:{0}", context.Message.Value); }); }); }); }); //Console.WriteLine(rbBus.Address); using (rbBus.Start()) { string text = string.Empty; do { text = Console.ReadLine(); var point = rbBus.GetSendEndpoint(new Uri("rabbitmq://192.168.5.136/handler")).Result; point.Send<IMessageContract>(new MessageContract { Value = $"Send To handler { text}" }); rbBus.Publish<IMessageContract>(new { Value = text }); } while (text != "E"); }
相关文章推荐
- 基础练习 矩阵乘法
- MySQL的limit是针对结果集进行分页。
- 使用cropbox实现图片剪切上传
- highchart堆叠柱图带峰值
- 个人对理想团队模式构建的设想以及对软件流程的理解
- 数据库读写分离
- stm32f103c8t6 错误处理
- Red Hat 成为第一家20亿美元收入的开源公司
- 编程执行scp
- GDAL实现读写ESRI ArcGIS的shapfile文件
- iOS的Objective-C的工厂设计模式详解
- 各种基本算法实现小结(三)—— 树与二叉树
- js父窗口和子窗口之间传值
- 图像处理工具V1.0
- 逼真打字机效果;
- JavaScript中常见的字符串操作函数及用法
- 基础练习 矩形面积交
- [Android] AndroidManifest.xml出现问题
- Linux 打开默认串口支持数量超过4个的方法
- linux文本处理三剑客之sed