zoukankan      html  css  js  c++  java
  • RabbitMQ学习之RPC(6)

     在第二个教程中,我们了解到如何在多个worker中使用Work Queues分发费时的任务。

    但是,如果我们需要在远程运行一个函数并且等待结果该怎么办呢?这个时候,我们需要另外一个模式了。这种模式通常被叫做Remote Procedure Call 或者RPC.

    在这个教程中,我们将使用RabbitMQ来建立一个RPC系统:a clienta scalable RPC server.

    Client interface

     为了说明RPC服务怎样被使用,我们将创建一个简单的Client class(客户端类)。它会暴露一个发送RPC请求的名叫Call的方法并且会阻塞到接收到answer. 

    var rpcClient = new RPCClient();
    
    Console.WriteLine(" [x] Requesting fib(30)");var response = rpcClient.Call("30");
    Console.WriteLine(" [.] Got '{0}'", response);
    
    rpcClient.Close();

    Callback queue

    一般来说,通过RabbitMQ来做RPC是简单的。客户端(client)发送request message并且服务端(server)返回response message. 为了接收到response,我们需要在request上发送一个callback queue address(回调队列地址). 

    var props = channel.CreateBasicProperties();
    props.ReplyTo = replyQueueName;  //设置callback queue name
    var messageBytes = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "",
                         routingKey: "rpc_queue",
                         basicProperties: props,
                         body: messageBytes);
    // ... then code to read a response message from the callback_queue ...
    Message properties

    AMQP协议在message上预定义了14个属性的集合。 大部分属性很少使用,下面的使用比较多:

    • Persistent : 使message持久化

    • DeliveryMode : 那些熟悉这个协议的可能会使用这个属性而不是Persistent.来做持久化。

    • ContentType : 用来描述编码类型。例如,经常使用的JSON编码,通常设置属性为:application/json

    • ReplyTo : 用来命名callback queue(回调队列)

    • CorrelationId : 用来关联RPC Response 和request

    Correlation Id

    在之前我们讲的方法中,我们建议为每一个RPC request建立一个callback queue. 那样很没有效率,幸运的,还有一种更好的方法:我们为每个client创建单独的一个callback queue.

    这个时候我们需要CorrelationId属性来关联responserequest. 每个request都有唯一的correlationId. 当我们在队列中收到一个message,我们看下这个属性,并且根据它我们来匹配responserequest. 如果我们看到一个不知道的CorrelationId值,我们会安全的丢掉这个message. 它不属于我们的requests. 

    你可能会问,为什么我们忽视callback queue中不知道的messageunknow messages),而不是报错呢?那是服务端有 竞态资源 的可能性。尽管不太可能,但它是可能的,RPC服务器在发送给我们answer之后,但还没有发送an acknowledgement message之前死掉了。如果这种情况发生了,重启的RPC服务器将会再处理这个request. 那就是客户端为什么要优雅的处理两次responses. (可以对比第二个教程,会在接收端确认,如果接收端没有确认,之后队列会再次发送request,服务端需要再次处理)

    Summary(总结)

    我们的RPC像图中这样工作:

    • 当一个client启动时,它创建一个匿名的专用的callback queue.
    • 对于一个RPC request,client将会发送带有两个属性的messageReplyTo,用来设置callback queue;并且CorrelationId,用来为每个request设置唯一的值。
    • Request会被发送到rpc_queue.
    • RPC worker(这里就是server)将会等待接收rpc_queue队列里的requests。当request出现时,它会做这个job,并且发送一个带结果的messageclient,使用被ReplyTo属性命名的队列。
    • Client会等待callback queue的数据。当一个message出现时,它会检查CorrealtionId属性。如果它匹配request里的这个值(CorrealationId),将会返回response到这个应用(client)

    代码

    The Fibonacci task:

    private static int fib(int n)
    {
        if (n == 0 || n == 1) return n;
        return fib(n - 1) + fib(n - 2);
    }

    RPCServer.cs

    using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
    class RPCServer
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "rpc_queue", durable: false,  //声明queue
                  exclusive: false, autoDelete: false, arguments: null);
                channel.BasicQos(0, 1, false); //公平调度策略
                var consumer = new EventingBasicConsumer(channel);
                channel.BasicConsume(queue: "rpc_queue",  //接收消息
                  autoAck: false, consumer: consumer);
                Console.WriteLine(" [x] Awaiting RPC requests");
    
                consumer.Received += (model, ea) =>
                {
                    string response = null;
    
                    var body = ea.Body;
                    var props = ea.BasicProperties;
                    var replyProps = channel.CreateBasicProperties();
                    replyProps.CorrelationId = props.CorrelationId;//设置返回的CorrealationId
    
                    try
                    {
                        var message = Encoding.UTF8.GetString(body);
                        int n = int.Parse(message);
                        Console.WriteLine(" [.] fib({0})", message);
                        response = fib(n).ToString(); //设置响应数据
                    }
                    catch (Exception e)
                    {
                        Console.WriteLine(" [.] " + e.Message);
                        response = "";
                    }
                    finally
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(response);
                        channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,//发送响应到callback queue
                          basicProperties: replyProps, body: responseBytes);
                        channel.BasicAck(deliveryTag: ea.DeliveryTag,
                          multiple: false);
                    }
                };
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    
        /// 
        /// Assumes only valid positive integer input.
        /// Don't expect this one to work for big numbers, and it's
        /// probably the slowest recursive implementation possible.
        /// 
        private static int fib(int n)
        {
            if (n == 0 || n == 1)
            {
                return n;
            }
    
            return fib(n - 1) + fib(n - 2);
        }
    }

    RPCClient.cs

    using System;using System.Collections.Concurrent;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;
    public class RpcClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;
    public RpcClient()
    {
            var factory = new ConnectionFactory() { HostName = "localhost" };
    
            connection = factory.CreateConnection();
            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);
    
            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId;
            props.ReplyTo = replyQueueName;
    
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var response = Encoding.UTF8.GetString(body);
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };
        }
    
        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
    
            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
    
            return respQueue.Take(); ;
        }
    
        public void Close()
        {
            connection.Close();
        }
    }
    public class Rpc
    {
        public static void Main()
        {
            var rpcClient = new RpcClient();
    
            Console.WriteLine(" [x] Requesting fib(30)");
            var response = rpcClient.Call("30");
    
            Console.WriteLine(" [.] Got '{0}'", response);
            rpcClient.Close();
        }
    }

     参考网址:

    https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

  • 相关阅读:
    十三、Sleuth分布式请求链路追踪
    十二、SpringCloud Stream消息驱动
    十一、SpringCloud Bus 消息总线
    Linux命令(权限管理)
    Linux命令(文件管理)
    Linux的文件和目录
    Linux介绍及安装
    Docker
    Nginx
    13、SpringBoot整合Mybatis-Plus
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/10952618.html
Copyright © 2011-2022 走看看