zoukankan      html  css  js  c++  java
  • RabbitMQ学习(6) (远程过程调用(RPC))

    第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

    但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用RPC

    在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。

    客户端界面

    为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:

    var rpcClient = new RPCClient();
    
    Console.WriteLine(" [x] Requesting fib(30)");
    var response = rpcClient.Call("30");
    Console.WriteLine(" [.] Got '{0}'", response);
    
    rpcClient.Close();

    有关RPC的说明

    尽管RPC在计算中是相当常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。

    考虑到以下建议:

    • 确定哪个函数调用是本地的,哪个是远程的。
    • 记录你的系统。清楚组件之间的依赖关系。
    • 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?

    有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。

    回调队列

    一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:

    var corrId = Guid.NewGuid().ToString();
    var props = channel.CreateBasicProperties();
    props.ReplyTo = replyQueueName;
    props.CorrelationId = corrId;
    
    var messageBytes = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "",
                         routingKey: "rpc_queue",
                         basicProperties: props,
                         body: messageBytes); //然后从callback_queue读取响应消息的代码

    消息属性

    AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:

    • deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性
    • contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯
    • replyTo:通常用来命名一个回调队列。
    • correlationId:用于将RPC响应与请求关联起来。

    相关标识

    在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。

    这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。

    您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。

    概要

    我们的RPC将会像这样工作:

    • 当客户端启动时,它创建一个匿名排他回调队列。
    • 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
    • 该请求被发送到一个rpc_queue队列。
    • RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端
    • 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。

    把它放在一起

    斐波那契任务:

    private static int fib(int n)
    {
        if (n == 0 || n == 1) return n;
        return fib(n - 1) + fib(n - 2);
    }

    我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。

    我们的RPC服务器RPCServer.cs的代码如下所示:

    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    class RPCServer
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue", durable: false,
                  exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(0, 1, false);
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",
                  autoAck: false, consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");
    
                consumer.Received += (model, ea) =>
                {
                    string response = null;
    
                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;
    
                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString();
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                          basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                          multiple: false);
                    }
                };
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    
        /// 
    
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers, and it's
        /// probably the slowest recursive implementation possible.
        /// 
    
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }
    
            return fib(n - 1) + fib(n - 2);
        }
    }

    服务器代码非常简单:

    • 像往常一样,我们首先建立连接,通道和声明队列。
    • 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
    • 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去

    我们的RPC客户端RPCClient.cs的代码

     

    using System;
    using System.Collections.Concurrent;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;
    
    public RpcClient()
    {
            var factory = new ConnectionFactory() { HostName = "localhost" };
    
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);
    
            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;
    
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };
        }
    
        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
    
            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
    
            return respQueue.Take(); ;
        }
    
        public void Close()
        {
            connection.Close();
        }
    }
    
    public class Rpc
    {
        public static void Main()
        {
            var rpcClient = new RpcClient();
    
            Console.WriteLine(" [x] Requesting fib(30)");
            var response = rpcClient.Call("30");
    
            Console.WriteLine(" [.] Got '{0}'", response);
            rpcClient.Close();
        }
    }

     

    客户端代码稍微涉及一些:

    • 我们建立一个连接和通道,并为回复声明一个独占的“回叫”队列。
    • 我们订阅“回调”队列,以便我们可以接收RPC响应。
    • 我们的调用方法会产生实际的RPC请求。
    • 在这里,我们首先生成一个唯一的correlationId 数字并保存它 - while循环将使用这个值来捕获适当的响应。
    • 接下来,我们发布请求消息,具有两个属性: replyTocorrelationId
    • 在这一点上,我们可以坐下来等待,直到正确的答复到达。
    • while循环做的非常简单,每一个响应消息都会检查correlationId 是否是我们正在寻找的。如果是这样,它保存了响应。
    • 最后,我们将回复返回给用户。

    提出客户要求:

     

     

    var rpcClient = new RPCClient();
    
    Console.WriteLine(" [x] Requesting fib(30)");
    var response = rpcClient.Call("30");
    Console.WriteLine(" [.] Got '{0}'", response);
    
    rpcClient.Close();

     

    现在是查看RPCClient.csRPCServer.cs的完整示例源代码(包括基本的异常处理)的 时机

    照常设置(参见教程一):

    我们的RPC服务已经准备就绪。我们可以启动服务器:

     

     

    cd RPCServer
    dotnet run
    # => [x] Awaiting RPC requests
    

     

      

    要申请一个斐波那契数字运行客户端:

    cd RPCClient
    dotnet run
    # => [x] Requesting fib(30)
    

      

    这里介绍的设计不是RPC服务的唯一可能的实现,但它有一些重要的优点:

    • 如果RPC服务器速度太慢,可以通过运行另一个来扩展。尝试在新的控制台中运行第二个RPCServer
    • 在客户端,RPC需要发送和接收一条消息。不需要像queueDeclare这样的同步调用 。因此,RPC客户端只需要一次网络往返就可以获得一个RPC请求。

    我们的代码仍然非常简单,并不试图解决更复杂(但重要)的问题,如:

    • 如果没有服务器运行,客户应该如何应对?
    • 客户端是否应该对RPC有某种超时?
    • 如果服务器发生故障并引发异常,是否应将其转发给客户端?
    • 在处理之前防止无效的传入消息(例如检查边界,类型)。

     

     

     




    远程过程调用(RPC)

    (使用.NET客户端)

     

    先决条件

    本教程假定RabbitMQ已安装在标准端口(5672上的本地主机运行如果您使用不同的主机,端口或凭据,连接设置将需要调整。

    在哪里得到帮助

    如果您在阅读本教程时遇到困难,可以 通过邮件列表与我们联系。

     

    第二个教程中,我们学习了如何使用工作队列在多个工作人员之间分配耗时的任务。

    但是如果我们需要在远程计算机上运行一个函数并等待结果呢?那么,这是一个不同的故事。这种模式通常被称为远程过程调用RPC

    在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个可扩展的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回斐波那契数字的虚拟RPC服务。

    客户端界面

    为了说明如何使用RPC服务,我们将创建一个简单的客户端类。它将公开一个名为call的方法 ,它发送一个RPC请求并阻塞,直到收到答案:

    var rpcClient = new RPCClient(); 
    
    Console.WriteLine(“[x] Requesting fib(30)”);
    var response = rpcClient.Call(“30”); 
    Console.WriteLine(“[。]得到”{0}“,response); 
    
    rpcClient.Close();
    

    有关RPC的说明

    尽管RPC在计算中是相当常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,就会出现问题。像这样的混乱会导致一个不可预知的系统,并增加调试的不必要的复杂性。而不是简化软件,滥用RPC会导致不可维护的意大利面代码。

    考虑到以下建议:

    • 确定哪个函数调用是本地的,哪个是远程的。
    • 记录你的系统。清楚组件之间的依赖关系。
    • 处理错误情况。当RPC服务器长时间关闭时,客户端应该如何反应?

    有疑问时避免RPC。如果可以的话,你应该使用一个异步管道 - 而不是像RPC一样的阻塞,结果被异步地推到下一个计算阶段。

    回调队列

    一般来说,使用RPC over RabbitMQ很容易。客户端发送请求消息,服务器回复响应消息。为了收到回应,我们需要发送一个“回叫”队列地址与请求:

    var corrId = Guid.NewGuid()。ToString();
    var props = channel.CreateBasicProperties(); 
    props.ReplyTo = replyQueueName; 
    props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); 
    channel.BasicPublish(exchange:“”
                         routingKey:“rpc_queue”
                         basicProperties:props,
                         body:messageBytes); // ...然后从callback_queue读取响应消息的代码...
    
    
    
    
    

    消息属性

    AMQP 0-9-1协议预定义了一组与消息一起的14个属性。大多数属性很少使用,但以下情况除外:

    • deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)。您可能会从第二个教程中记住这个属性
    • contentType:用于描述编码的MIME类型。例如对于经常使用的JSON编码,将此属性设置为application / json是一个好习惯
    • replyTo:通常用来命名一个回调队列。
    • correlationId:用于将RPC响应与请求关联起来。

    相关标识

    在上面介绍的方法中,我们建议为每个RPC请求创建一个回调队列。这是非常低效的,但幸运的是有一个更好的方法 - 让我们创建一个单一的客户端回调队列。

    这引发了一个新的问题,在该队列中收到回复,不清楚回复属于哪个请求。那是什么时候使用correlationId属性。我们将把它设置为每个请求的唯一值。稍后,当我们在回调队列中收到一条消息时,我们将查看这个属性,并基于这个属性,我们可以将响应与请求进行匹配。如果我们看到一个未知的 correlationId值,我们可以放心地丢弃这个消息 - 这不属于我们的请求。

    您可能会问,为什么我们应该忽略回调队列中的未知消息,而不是失败?这是由于在服务器端的竞争条件的可能性。虽然不太可能,但在发送给我们答案之后,但在发送请求的确认消息之前,RPC服务器可能会死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。这就是为什么在客户端,我们必须优雅地处理重复的响应,理想情况下RPC应该是幂等的。

    概要

    我们的RPC将会像这样工作:

    • 当客户端启动时,它创建一个匿名排他回调队列。
    • 对于RPC请求,客户端会发送一个消息,其中包含两个属性: replyTo,它被设置为回调队列和correlationId,它被设置为每个请求的唯一值。
    • 该请求被发送到一个rpc_queue队列。
    • RPC worker(又名:服务器)正在等待该队列上的请求。当一个请求出现时,它执行这个工作,并使用replyTo字段中的队列将结果发送回客户端
    • 客户端在回调队列中等待数据。出现消息时,将检查correlationId属性。如果它匹配来自请求的值,则返回对应用程序的响应。

    把它放在一起

    斐波那契任务:

    private  static  int  fibint n {
         if(n == 0 || n == 1 return n;
        return fib(n  - 1)+ fib(n  - 2); 
    }
    

    我们宣布我们的斐波那契函数。它只假定有效的正整数输入。(不要指望这个函数可以用于大数字,也可能是最慢的递归实现)。

    我们的RPC服务器RPCServer.cs的代码如下所示:

    使用系统;
    使用 RabbitMQ.Client;
    使用 RabbitMQ.Client.Events;
    使用 System.Text; class RPCServer 
    { public static void Main
    
     
            {
             var factory = new ConnectionFactory(){HostName = “localhost” };
            使用VAR连接= factory.CreateConnection())
             使用VAR信道= connection.CreateModel())
            { 
                channel.QueueDeclare(队列:“rpc_queue” 耐用:
                  独家:,自动删除:,自变量:); 
                channel.BasicQos(01);
                var consumer = new EventingBasicConsumer(channel); 
                channel.BasicConsume(队列:“rpc_queue”
                  autoAck:false,consumer:consumer); 
                Console.WriteLine(“[x]等待RPC请求”); 
    
                consumer.Received + =(model,ea)=> 
                { string response = null ; var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    
    
                    
                    replyProps.CorrelationId = props.CorrelationId; 尝试 
                    { var message = Encoding.UTF8.GetString(body);
                        int n = int .Parse(message); 
                        Console.WriteLine(“[。] fib({0})”,message); 
                        response = fib(n).ToString(); 
                    } 捕获(例外五)
                    { 
                        Console.WriteLine(“[]” + e.Message); 
                        response = “” ; 
                    } 终于 
                    { VAR
    
                    
                        
                    
                    
                        responseBytes = Encoding.UTF8.GetBytes(response); 
                        channel.BasicPublish(exchange:“”,routingKey:props.ReplyTo,
                          basicProperties:replyProps,body:responseBytes); 
                        channel.BasicAck(deliveryTag:ea.DeliveryTag,
                          multiple:false); 
                    } 
                }; 
    
                Console.WriteLine(“按[enter]退出”); 
                到Console.ReadLine(); 
            } 
        } ///
    
        
        ///只承担有效的正整数输入。
        ///不要指望这个函数可以用于大数字,而且
        ///可能是最慢的递归实现。
        /// 
        private  static  int  fibint n {
             if(n == 0 || n == 1
            { return n; 
            } return fib(n  - 1)+ fib(n  - 2); 
        }
    }
                
    
            
    

    服务器代码非常简单:

    • 像往常一样,我们首先建立连接,通道和声明队列。
    • 我们可能想要运行多个服务器进程。为了在多个服务器上平均分配负载,我们需要在channel.basicQos中设置 prefetchCount设置。
    • 我们使用basicConsume来访问队列。然后我们注册一个交付处理程序,在这个处理程序中我们完成这项工作,然后将回复发送回去

    我们的RPC客户端RPCClient.cs的代码

  • 相关阅读:
    java----session
    js封装成插件-------Canvas统计图插件编写
    js封装成插件
    js学习--变量作用域和作用域链
    学习js函数--自执行函数
    学习js函数--函数定义
    footer不满一屏时在最底部,超出一屏时在页面最下部
    ios 点击区域阴影问题
    提交表单后数据返回时间过长
    点击显示video
  • 原文地址:https://www.cnblogs.com/missliu/p/8081895.html
Copyright © 2011-2022 走看看