zoukankan      html  css  js  c++  java
  • 【译】RabbitMQ:远程过程调用(RPC)

    在教程二中,我们学习了如何使用工作队列在多个工作线程中分发耗时的任务。但如果我们需要去执行远程机器上的方法并且等待结果会怎么样呢?那又是另外一回事了。这种模式通常被称为远程过程调用(RPC)。

    本教程中我们将使用RabbitMQ构建一个远程过程调用系统:一个客户端和一个可扩展的服务器。由于没有什么耗时的任务值得分发,我们将创建一个虚拟的RPC服务用于返回斐波那契数列。

    客户端接口

    为了阐释如何使用RPC服务我们将创建一个简单的客户端类。类中奖公开一个方法用于发送一个RPC请求,然后阻塞知道收到应答,方法名称叫做call:

    1 var rpcClient = new RPCClient();
    2 
    3 Console.WriteLine(" [x] Requesting fib(30)");
    4 var response = rpcClient.Call("30");
    5 Console.WriteLine(" [.] Got '{0}'", response);
    6 
    7 rpcClient.Close();

     

    RPC注记

    尽管RPC在计算机技术中是一种非常常见的模式,但是它却饱受批判,问题发生在程序员不知道一个调用是本地的还是一个耗时的RPC。这样的混乱,导致不可预知的系统,并将不必要的复杂性调价到调试过程中。误用RPC将导致不可维护的混乱的代码,而不是简化软件。

    铭记这些限制,考虑下面的建议:

    • 确保方法是本地调用还是远程调用能清晰明了
    • 将系统归档备案,使组件间的依赖关系足够清晰
    • 捕获异常,当RPC服务宕机很长时间客户端作何响应?

    应该在不能确定的时候避免使用RPC,如果可以的话,你可以使用异步管道,而不是类RPC的阻塞,结果被异步推送到下一个计算阶段。

    回调队列

    一般来说,在RabbitMQ之上构建RPC非常的容易,客户端发送请求消息,服务返回应答消息。为了能够接收到应答的消息,我们需要在请求时指定一个回调队列地址:

     1 var corrId = Guid.NewGuid().ToString();
     2 var props = channel.CreateBasicProperties();
     3 props.ReplyTo = replyQueueName;
     4 props.CorrelationId = corrId;
     5 
     6 var messageBytes = Encoding.UTF8.GetBytes(message);
     7 channel.BasicPublish(exchange: "",
     8                      routingKey: "rpc_queue",
     9                      basicProperties: props,
    10                      body: messageBytes);
    11 
    12 // ... 然后是从回调队列中读取消息的代码 ... 

    消息属性

    AMQP协议预定义了一个包含14个属性的属性集作用于消息之上,大多数都很少使用,除了下面这些:

    • deliveryMode:将消息标记为持续(使用数值2)或瞬时(其他任意值)的,通过教程二你应该还记得这个属性。
    • contentType:用于描述媒体类型编码,例如:针对常用的JSON编码,最好的做法是把这个属性设置为:application/json
    • relayTo:通常用于命名一个回调队列。
    • correlationId:关联RPC请求和响应的时候非常有用

    关联ID

    在上面准备的方法中,我们建议为每一个RPC请求创建一个回调队列。这样相当低效,辛运的是有更好的方法,让我们为每一个客户端创建一个回调队列。

    这样引出了一个新问题,当收到一个响应的时候,它无法清楚的知道响应属于哪一个请求。这就是correlationId派上用场的时候。我们将为每一个请求设置一个唯一的关联ID,之后当我们从回调队列收到一个响应的时候,我们将检查这个属性,基于此,便能将响应和请求关联起来了。如果发现一个未知的关联ID值,我们可以安全的销毁消息,因为消息不属于任何一个请求。

    你可能会奇怪,为什么我们忽略掉未知关联ID值得消息,而不是用错误来标记失败?这是因为在服务器端可能存在争用条件。尽管不太可能,但是RPC服务器可能在发送了响应消息而未发送消息确认的情况下出现故障,如果出现这样的情况,在RPC服务器重启之后将再次处理该请求。这就是为什么我们必须在客户端优雅的捕获重复的请求,并且RPC理论上应该是幂等的。

    总结

                                     

    我们的RPC将这样工作:

    • 当客户端启动时,它会创建一个匿名的独占回调队列。
    • 对于一个RPC请求,客户端通过两个属性发送一条消息:relayTo,设置回调队列;correlationId,为每个请求设置一个唯一值。
    • 消息将被发送到一个rpc_queue队列。
    • RPC工作线程(即,服务器)在该队列上等待请求。当请求出现,他将处理请求并把结果发回给客户端,使用的队列是在replayTo中设置的。
    • 客户端在回调队列上等待响应,当消息出现,它检查关联ID,如果匹配来自请求的关联ID值,返回消息到该应用程序。

    组合在一起

    斐波那契任务:

    1 private static int fib(int n)
    2 {
    3     if (n == 0 || n == 1) return n;
    4     return fib(n - 1) + fib(n - 2);
    5 }

    我们定义斐波那契函数,它只采用正整数作为输入。(别指望它能在大数值的情况下工作,而且这可能是最慢的一种递归实现)

    RPC服务器RPCServer.cs中的代码看起来是这样的:

     1 using System;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System.Text;
     5 
     6 class RPCServer
     7 {
     8     public static void Main()
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.QueueDeclare(queue: "rpc_queue",
    15                                  durable: false,
    16                                  exclusive: false,
    17                                  autoDelete: false,
    18                                  arguments: null);
    19             channel.BasicQos(0, 1, false);
    20             var consumer = new QueueingBasicConsumer(channel);
    21             channel.BasicConsume(queue: "rpc_queue",
    22                                  noAck: false,
    23                                  consumer: consumer);
    24             Console.WriteLine(" [x] Awaiting RPC requests");
    25 
    26             while(true)
    27             {
    28                 string response = null;
    29                 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    30 
    31                 var body = ea.Body;
    32                 var props = ea.BasicProperties;
    33                 var replyProps = channel.CreateBasicProperties();
    34                 replyProps.CorrelationId = props.CorrelationId;
    35 
    36                 try
    37                 {
    38                     var message = Encoding.UTF8.GetString(body);
    39                     int n = int.Parse(message);
    40                     Console.WriteLine(" [.] fib({0})", message);
    41                     response = fib(n).ToString();
    42                 }
    43                 catch(Exception e)
    44                 {
    45                     Console.WriteLine(" [.] " + e.Message);
    46                     response = "";
    47                 }
    48                 finally
    49                 {
    50                     var responseBytes = Encoding.UTF8.GetBytes(response);
    51                     channel.BasicPublish(exchange: "",
    52                                          routingKey: props.ReplyTo,
    53                                          basicProperties: replyProps,
    54                                          body: responseBytes);
    55                     channel.BasicAck(deliveryTag: ea.DeliveryTag,
    56                                      multiple: false);
    57                 }
    58             }
    59         }
    60     }
    61 
    62     /// <summary>
    63     /// Assumes only valid positive integer input.
    64     /// Don't expect this one to work for big numbers,
    65     /// and it's probably the slowest recursive implementation possible.
    66     /// </summary>
    67     private static int fib(int n)
    68     {
    69         if(n == 0 || n == 1)
    70         {
    71             return n;
    72         }
    73 
    74         return fib(n - 1) + fib(n - 2);
    75     }
    76 }

    服务端代码相当简单:

    • 通常情况下,我们都会以创建链接、信道和申明队列作为开始。
    • 我们可能希望运行不止一个服务器进程。为了将加载均匀分布到多个服务器,我们需要将prefetchCount设置为channel.basicQos
    • 我们使用basicConsume来访问队列。之后进入While循环,等待请求消息,完成工作,然后发回响应。

    RPC客户端RPCClient.cs中的代码:

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 using RabbitMQ.Client;
     7 using RabbitMQ.Client.Events;
     8 
     9 class RPCClient
    10 {
    11     private IConnection connection;
    12     private IModel channel;
    13     private string replyQueueName;
    14     private QueueingBasicConsumer consumer;
    15 
    16     public RPCClient()
    17     {
    18         var factory = new ConnectionFactory() { HostName = "localhost" };
    19         connection = factory.CreateConnection();
    20         channel = connection.CreateModel();
    21         replyQueueName = channel.QueueDeclare().QueueName;
    22         consumer = new QueueingBasicConsumer(channel);
    23         channel.BasicConsume(queue: replyQueueName,
    24                              noAck: true,
    25                              consumer: consumer);
    26     }
    27 
    28     public string Call(string message)
    29     {
    30         var corrId = Guid.NewGuid().ToString();
    31         var props = channel.CreateBasicProperties();
    32         props.ReplyTo = replyQueueName;
    33         props.CorrelationId = corrId;
    34 
    35         var messageBytes = Encoding.UTF8.GetBytes(message);
    36         channel.BasicPublish(exchange: "",
    37                              routingKey: "rpc_queue",
    38                              basicProperties: props,
    39                              body: messageBytes);
    40 
    41         while(true)
    42         {
    43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    44             if(ea.BasicProperties.CorrelationId == corrId)
    45             {
    46                 return Encoding.UTF8.GetString(ea.Body);
    47             }
    48         }
    49     }
    50 
    51     public void Close()
    52     {
    53         connection.Close();
    54     }
    55 }
    56 
    57 class RPC
    58 {
    59     public static void Main()
    60     {
    61         var rpcClient = new RPCClient();
    62 
    63         Console.WriteLine(" [x] Requesting fib(30)");
    64         var response = rpcClient.Call("30");
    65         Console.WriteLine(" [.] Got '{0}'", response);
    66 
    67         rpcClient.Close();
    68     }
    69 }

    客户端的代码要稍微复杂一些:

    • 创建一个链接、信道、为响应申明独占的回调队列。
    • 订阅回调队列,以便接收RPC响应。
    • call方法完成实际的RPC调用。
    • 首先创建一个唯一的关联Id并且保存它,while循环使用它去匹配合适的应答。
    • 接下来,我们发布请求消息,使用了两个属性:replyTocorrelationId
    • 这时我们就可以坐等正确的响应到达了。
    • While循环做的事情非常简单,检测每一个响应,如果correlactionId是我们需要的,就保存该响应。
    • 最后,把响应返回给用户。

    构建客户端请求:

    1 RPCClient fibonacciRpc = new RPCClient();
    2 
    3 System.out.println(" [x] Requesting fib(30)");
    4 String response = fibonacciRpc.call("30");
    5 System.out.println(" [.] Got '" + response + "'");
    6 
    7 fibonacciRpc.close();

    现在是时候来看看完整示例的源代码了(包含基本的异常处理)。RPCClient.csRPCServer.cs

    编译(参见教程一):

    1 $ csc /r:"RabbitMQ.Client.dll" RPCClient.cs
    2 $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs

    现在RPC服务已经准备就绪,可以启动服务了:

    1 $ RPCServer.exe
    2  [x] Awaiting RPC requests

    运行客户端去请求斐波那契数列:

    1 $ RPCClient.exe
    2  [x] Requesting fib(30)

    这里介绍的设计并非RPC服务的唯一实现方式,但是它有一些重要的优势:

    • 如果RPC服务太慢,你可以通过运行另外一个实例来对其进行横向扩展,试着在一个新的控制台里面运行另一个服务器。
    • 在客户端,RPC只要求发送和接收一条消息,没有如同declareQueue的同步调用被要求。作为结果,RPC客户端对于一个RPC请求,只需要一个网络往返。

    我们的代码依然非常简单,并没有尝试去解决一些复杂(但是重要)的问题,比如:

    • 如果没有运行中的服务器,客户端将作何响应?
    • 客户端对于RPC是否可以有某种形式的超时?
    • 如果服务器发生故障,引发异常,是否应当被转发给客户端?
    • 在处理之前,避免无效的输入数据,比如:检查边界、类型等。

    如果你想尝试,你可以找到有用的RabbitMQ管理插件去浏览队列。

    原文链接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  • 相关阅读:
    UVa532 Dungeon Master 三维迷宫
    6.4.2 走迷宫
    UVA 439 Knight Moves
    UVa784 Maze Exploration
    UVa657 The die is cast
    UVa572 Oil Deposits DFS求连通块
    UVa10562 Undraw the Trees
    UVa839 Not so Mobile
    327
    UVa699 The Falling Leaves
  • 原文地址:https://www.cnblogs.com/chen108/p/4972875.html
Copyright © 2011-2022 走看看