【译】RabbitMQ:远程过程调用(RPC)
2015-11-17 21:14
393 查看
在教程二中,我们学习了如何使用工作队列在多个工作线程中分发耗时的任务。但如果我们需要去执行远程机器上的方法并且等待结果会怎么样呢?那又是另外一回事了。这种模式通常被称为远程过程调用(RPC)。
本教程中我们将使用RabbitMQ构建一个远程过程调用系统:一个客户端和一个可扩展的服务器。由于没有什么耗时的任务值得分发,我们将创建一个虚拟的RPC服务用于返回斐波那契数列。
RPC注记
尽管RPC在计算机技术中是一种非常常见的模式,但是它却饱受批判,问题发生在程序员不知道一个调用是本地的还是一个耗时的RPC。这样的混乱,导致不可预知的系统,并将不必要的复杂性调价到调试过程中。误用RPC将导致不可维护的混乱的代码,而不是简化软件。
铭记这些限制,考虑下面的建议:
确保方法是本地调用还是远程调用能清晰明了
将系统归档备案,使组件间的依赖关系足够清晰
捕获异常,当RPC服务宕机很长时间客户端作何响应?
应该在不能确定的时候避免使用RPC,如果可以的话,你可以使用异步管道,而不是类RPC的阻塞,结果被异步推送到下一个计算阶段。
deliveryMode:将消息标记为持续(使用数值2)或瞬时(其他任意值)的,通过教程二你应该还记得这个属性。
contentType:用于描述媒体类型编码,例如:针对常用的JSON编码,最好的做法是把这个属性设置为:application/json。
relayTo:通常用于命名一个回调队列。
correlationId:关联RPC请求和响应的时候非常有用
这样引出了一个新问题,当收到一个响应的时候,它无法清楚的知道响应属于哪一个请求。这就是correlationId派上用场的时候。我们将为每一个请求设置一个唯一的关联ID,之后当我们从回调队列收到一个响应的时候,我们将检查这个属性,基于此,便能将响应和请求关联起来了。如果发现一个未知的关联ID值,我们可以安全的销毁消息,因为消息不属于任何一个请求。
你可能会奇怪,为什么我们忽略掉未知关联ID值得消息,而不是用错误来标记失败?这是因为在服务器端可能存在争用条件。尽管不太可能,但是RPC服务器可能在发送了响应消息而未发送消息确认的情况下出现故障,如果出现这样的情况,在RPC服务器重启之后将再次处理该请求。这就是为什么我们必须在客户端优雅的捕获重复的请求,并且RPC理论上应该是幂等的。
我们的RPC将这样工作:
当客户端启动时,它会创建一个匿名的独占回调队列。
对于一个RPC请求,客户端通过两个属性发送一条消息:relayTo,设置回调队列;correlationId,为每个请求设置一个唯一值。
消息将被发送到一个rpc_queue队列。
RPC工作线程(即,服务器)在该队列上等待请求。当请求出现,他将处理请求并把结果发回给客户端,使用的队列是在replayTo中设置的。
客户端在回调队列上等待响应,当消息出现,它检查关联ID,如果匹配来自请求的关联ID值,返回消息到该应用程序。
我们定义斐波那契函数,它只采用正整数作为输入。(别指望它能在大数值的情况下工作,而且这可能是最慢的一种递归实现)
RPC服务器RPCServer.cs中的代码看起来是这样的:
服务端代码相当简单:
通常情况下,我们都会以创建链接、信道和申明队列作为开始。
我们可能希望运行不止一个服务器进程。为了将加载均匀分布到多个服务器,我们需要将prefetchCount设置为channel.basicQos。
我们使用basicConsume来访问队列。之后进入While循环,等待请求消息,完成工作,然后发回响应。
RPC客户端RPCClient.cs中的代码:
客户端的代码要稍微复杂一些:
创建一个链接、信道、为响应申明独占的回调队列。
订阅回调队列,以便接收RPC响应。
call方法完成实际的RPC调用。
首先创建一个唯一的关联Id并且保存它,while循环使用它去匹配合适的应答。
接下来,我们发布请求消息,使用了两个属性:replyTo和correlationId。
这时我们就可以坐等正确的响应到达了。
While循环做的事情非常简单,检测每一个响应,如果correlactionId是我们需要的,就保存该响应。
最后,把响应返回给用户。
构建客户端请求:
现在是时候来看看完整示例的源代码了(包含基本的异常处理)。RPCClient.cs和RPCServer.cs。
编译(参见教程一):
现在RPC服务已经准备就绪,可以启动服务了:
运行客户端去请求斐波那契数列:
这里介绍的设计并非RPC服务的唯一实现方式,但是它有一些重要的优势:
如果RPC服务太慢,你可以通过运行另外一个实例来对其进行横向扩展,试着在一个新的控制台里面运行另一个服务器。
在客户端,RPC只要求发送和接收一条消息,没有如同declareQueue的同步调用被要求。作为结果,RPC客户端对于一个RPC请求,只需要一个网络往返。
我们的代码依然非常简单,并没有尝试去解决一些复杂(但是重要)的问题,比如:
如果没有运行中的服务器,客户端将作何响应?
客户端对于RPC是否可以有某种形式的超时?
如果服务器发生故障,引发异常,是否应当被转发给客户端?
在处理之前,避免无效的输入数据,比如:检查边界、类型等。
如果你想尝试,你可以找到有用的RabbitMQ管理插件去浏览队列。
原文链接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
本教程中我们将使用RabbitMQ构建一个远程过程调用系统:一个客户端和一个可扩展的服务器。由于没有什么耗时的任务值得分发,我们将创建一个虚拟的RPC服务用于返回斐波那契数列。
客户端接口
为了阐释如何使用RPC服务我们将创建一个简单的客户端类。类中奖公开一个方法用于发送一个RPC请求,然后阻塞知道收到应答,方法名称叫做call:var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
RPC注记
尽管RPC在计算机技术中是一种非常常见的模式,但是它却饱受批判,问题发生在程序员不知道一个调用是本地的还是一个耗时的RPC。这样的混乱,导致不可预知的系统,并将不必要的复杂性调价到调试过程中。误用RPC将导致不可维护的混乱的代码,而不是简化软件。
铭记这些限制,考虑下面的建议:
确保方法是本地调用还是远程调用能清晰明了
将系统归档备案,使组件间的依赖关系足够清晰
捕获异常,当RPC服务宕机很长时间客户端作何响应?
应该在不能确定的时候避免使用RPC,如果可以的话,你可以使用异步管道,而不是类RPC的阻塞,结果被异步推送到下一个计算阶段。
回调队列
一般来说,在RabbitMQ之上构建RPC非常的容易,客户端发送请求消息,服务返回应答消息。为了能够接收到应答的消息,我们需要在请求时指定一个回调队列地址:var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... 然后是从回调队列中读取消息的代码 ...
消息属性
AMQP协议预定义了一个包含14个属性的属性集作用于消息之上,大多数都很少使用,除了下面这些:deliveryMode:将消息标记为持续(使用数值2)或瞬时(其他任意值)的,通过教程二你应该还记得这个属性。
contentType:用于描述媒体类型编码,例如:针对常用的JSON编码,最好的做法是把这个属性设置为:application/json。
relayTo:通常用于命名一个回调队列。
correlationId:关联RPC请求和响应的时候非常有用
关联ID
在上面准备的方法中,我们建议为每一个RPC请求创建一个回调队列。这样相当低效,辛运的是有更好的方法,让我们为每一个客户端创建一个回调队列。这样引出了一个新问题,当收到一个响应的时候,它无法清楚的知道响应属于哪一个请求。这就是correlationId派上用场的时候。我们将为每一个请求设置一个唯一的关联ID,之后当我们从回调队列收到一个响应的时候,我们将检查这个属性,基于此,便能将响应和请求关联起来了。如果发现一个未知的关联ID值,我们可以安全的销毁消息,因为消息不属于任何一个请求。
你可能会奇怪,为什么我们忽略掉未知关联ID值得消息,而不是用错误来标记失败?这是因为在服务器端可能存在争用条件。尽管不太可能,但是RPC服务器可能在发送了响应消息而未发送消息确认的情况下出现故障,如果出现这样的情况,在RPC服务器重启之后将再次处理该请求。这就是为什么我们必须在客户端优雅的捕获重复的请求,并且RPC理论上应该是幂等的。
总结
我们的RPC将这样工作:
当客户端启动时,它会创建一个匿名的独占回调队列。
对于一个RPC请求,客户端通过两个属性发送一条消息:relayTo,设置回调队列;correlationId,为每个请求设置一个唯一值。
消息将被发送到一个rpc_queue队列。
RPC工作线程(即,服务器)在该队列上等待请求。当请求出现,他将处理请求并把结果发回给客户端,使用的队列是在replayTo中设置的。
客户端在回调队列上等待响应,当消息出现,它检查关联ID,如果匹配来自请求的关联ID值,返回消息到该应用程序。
组合在一起
斐波那契任务:private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我们定义斐波那契函数,它只采用正整数作为输入。(别指望它能在大数值的情况下工作,而且这可能是最慢的一种递归实现)
RPC服务器RPCServer.cs中的代码看起来是这样的:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); while(true) { string response = null; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch(Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } } } /// <summary> /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, /// and it's probably the slowest recursive implementation possible. /// </summary> private static int fib(int n) { if(n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
服务端代码相当简单:
通常情况下,我们都会以创建链接、信道和申明队列作为开始。
我们可能希望运行不止一个服务器进程。为了将加载均匀分布到多个服务器,我们需要将prefetchCount设置为channel.basicQos。
我们使用basicConsume来访问队列。之后进入While循环,等待请求消息,完成工作,然后发回响应。
RPC客户端RPCClient.cs中的代码:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; class RPCClient { private IConnection connection; private IModel channel; private string replyQueueName; private QueueingBasicConsumer consumer; public RPCClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer); } public string Call(string message) { var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); while(true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); if(ea.BasicProperties.CorrelationId == corrId) { return Encoding.UTF8.GetString(ea.Body); } } } public void Close() { connection.Close(); } } class RPC { public static void Main() { var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
客户端的代码要稍微复杂一些:
创建一个链接、信道、为响应申明独占的回调队列。
订阅回调队列,以便接收RPC响应。
call方法完成实际的RPC调用。
首先创建一个唯一的关联Id并且保存它,while循环使用它去匹配合适的应答。
接下来,我们发布请求消息,使用了两个属性:replyTo和correlationId。
这时我们就可以坐等正确的响应到达了。
While循环做的事情非常简单,检测每一个响应,如果correlactionId是我们需要的,就保存该响应。
最后,把响应返回给用户。
构建客户端请求:
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
现在是时候来看看完整示例的源代码了(包含基本的异常处理)。RPCClient.cs和RPCServer.cs。
编译(参见教程一):
$ csc /r:"RabbitMQ.Client.dll" RPCClient.cs $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs
现在RPC服务已经准备就绪,可以启动服务了:
$ RPCServer.exe [x] Awaiting RPC requests
运行客户端去请求斐波那契数列:
$ RPCClient.exe [x] Requesting fib(30)
这里介绍的设计并非RPC服务的唯一实现方式,但是它有一些重要的优势:
如果RPC服务太慢,你可以通过运行另外一个实例来对其进行横向扩展,试着在一个新的控制台里面运行另一个服务器。
在客户端,RPC只要求发送和接收一条消息,没有如同declareQueue的同步调用被要求。作为结果,RPC客户端对于一个RPC请求,只需要一个网络往返。
我们的代码依然非常简单,并没有尝试去解决一些复杂(但是重要)的问题,比如:
如果没有运行中的服务器,客户端将作何响应?
客户端对于RPC是否可以有某种形式的超时?
如果服务器发生故障,引发异常,是否应当被转发给客户端?
在处理之前,避免无效的输入数据,比如:检查边界、类型等。
如果你想尝试,你可以找到有用的RabbitMQ管理插件去浏览队列。
原文链接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
相关文章推荐
- Poj 2184解题报告(01背包变种)
- 枚举
- cocos2d-x 植入广告(一) 百度横屏、插屏广告
- Cocos2dx-OpenGL ES 2.0教程:你的第一个三角形(1)
- Find the Duplicate Number
- UI基础-UIControl及其子类
- 【MDK调试经验】Undefined symbol exit (referred from jerror.o)
- RestServer 1.1发布
- (转载)Cocos2dx-OpenGL ES 2.0教程:你的第一个三角形(1)
- git 中的AutoCRLF与SafeCRLF换行符问题
- sync 库使用小结
- 配置hadoop各个节点之间免密码登录实践笔记
- Java实现最简单局域网QQ
- HDU 1402 A * B Problem Plus (FFT求高精度乘法)
- JS-Date对象
- Action层, Service层 和 Dao层的功能区分
- RestServer 1.1
- Hibernate(六)缓存
- Js event事件在IE、FF兼容性问题
- 剑指offer:二叉搜索树的后序遍历序列