在第三篇文章中, 我们学习了怎么使用队列在多了消息消费者当中进行耗时任务轮询。
但是如果我们想要在远程电脑上运行一个方法,然后等待其执行结果,这就是一个不同的场景,这种就是我们一般讲的RPC(远程过程调用)。
在这篇文章当中我们将会使用RabbitMQ构建一个简单的RPC系统,一个客户端和一个服务端,由于我们没有耗时任务需要轮询,因此我们创建一个假的返回斐波那契数字的服务端。
客户端接口
为了说明RPC服务是运行方式,我们创建一个了简单的Client类,该类有一个Call的方法用来发送RPC请求道服务端然后阻塞直到请求结果的返回。
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
注意:虽然RPC在计算处理中是一种非常常见的模型,但是经常有非常多的争议,当编程人员不注意一个方法函数是本地的还是较慢的RPC呼叫就会出现问题,例如当一个不可预测的系统在返回结果的调试信息当中添加一些非必要的复杂信息时会使我们感到迷惑。
代替简单的软件,错误的使用RPC导致代码变得不可维护。
请记住一下建议:
>确保清楚哦知道那个函数是本地的那个函数是RPC函数。
>文档化你的系统,确保组件之间的依赖更加清晰。
>处理异常场景,当远程RPC服务长时间中断时,客户端应该怎么处理。
当存在疑问时避免使用RPC,而应该使用异步管道--代替RPC的功能--如阻塞,可以将结果带入到下一个计算阶段。
回调队列
通常情况下,在不使用RabbitMQ进行RPC调用时非常简单的,客户端发起请求等待服务处返回结果。在RabbitMQ当中为了能够接受处理结果,我们需要把回调队列的地址随着请求一起发送给服务器。
var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
Message properties
AMQP预定义了14个属性可以随消息一起发送,其中大多数的属性很少使用,除了下面几个
DeliveryMode:标记消息是否是持久化(值为2)或者是非持久化(值为其他:1)
contentType:用来描述编码的的mime-type,例如对于经常使用的json,把其值设为application/json是一个非常不错的案例。
replyTo:通常被用来作为回调的队列名。
correlationId:匹配RPC请求消息和响应消息之间的关联是非常有用的。
Correlation Id
在之前呈现的方法中无门为每一个RPC请求都创建了一个回调队列--这是非常低效的,幸运的是有一个更好的方式--我们为每一个客户端创建一个回调队列。
这样就带来了一个新的问题,当接收到一个结果的时候我们不知道改结果对应于哪个RPC请求,这就是为什么使用correlationId 属性。对于每一个RPC请求来说,correlationId 都是全局唯一的,之后,当我们从回调队列当中接收到消息的时候我们就可以根据干属性值跟PRC请求匹配起来。如果我们发现了一个未知的correlationId值,我们可以放心的把它丢弃--它不属于我们的系统。
你可能会问问什么要丢弃一个未知的correlationId,而不是产生一个失败错误?这是由于可能的竞争机制,虽然这种情况是非常少见的,但是存在这种可能RPC服务刚刚把计算结果放入回调队列就挂了,这时还没有来的及进行对Request进行Ack确认,这种情况下当RPC服务器冲新启动的时候就会把该条消息重新再处理一次,这就是为什么客户端需要平滑的处理重复的correlationId结果,RPC服务在理想情况下是幂等的。
总结
我们的RPC系统将会这样工作:
>当客户端启动的时候它会创建一个匿名的排他队列
>对于RPC请求,客户端在发送消息上设置两个属性,replyTo--用作回调队列,correlationId--每一个请求都是唯一值。
>请求被发送到rpc_queue 队列
>RPC服务端等待rpc_queue 上面的请求,当请求出现时,它处理请求然后把结果发送到replyTo标示的回调队列上去。
>客户端在回调队列上等待结果,当消息出现时,它匹配消息的correlationId值,如果匹配成功,就会将该消息返回给应用程序。
整合如下:
斐波那契任务:
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我们声明一个斐波那契函数(假定输入是一个可用的正数),不要期望这时一个非常大的数字,虽然这个这个可能是最慢的递归算法。
RPCServer.cs

using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); while(true) { string response = null; var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch(Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } } } } /// <summary> /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, /// and it's probably the slowest recursive implementation possible. /// </summary> private static int fib(int n) { if(n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
服务端的代买非常的直接
>开始非常普通,我们创建连接,会话,声明一个队列
>我们可能希望有多个服务器来处理请求,为了在多个服务器之间实现负载我们需要在设置channel.basicQos的prefetchCount
>我们使用basicConsume 来接收队列消息,然后今日一个while循环来接收消息,处理消息,发送响应信息。
PRCClient.cs

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; class RPCClient { private IConnection connection; private IModel channel; private string replyQueueName; private QueueingBasicConsumer consumer; public RPCClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer); } public string Call(string message) { var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); while(true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); if(ea.BasicProperties.CorrelationId == corrId) { return Encoding.UTF8.GetString(ea.Body); } } } public void Close() { connection.Close(); } } class RPC { public static void Main() { var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
客户端的代码稍微复杂一点:
>创建连接,会话,然后创建一个回调用的排他消息队列
>我们订阅这个回调的队列,使我们可以收到RPC的相应信息
>call方法发起真实的 RPC呼叫
>我们创建了唯一的correlationId ,然后保存它,在下面的while循环中使用它来匹配相应的响应消息。
>然后我们发送包含replyTo 和correlationId属性的消息
>这时我们可以等待直到响应消息到达
>while循环做的事情非常简单,它匹配每一个到底的消息,然后检查其correlationId 是否是我们需要的如果是保存这个结果
>最后我们把结果返回给用户
发起客户端请求
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();