zoukankan      html  css  js  c++  java
  • RabbitMQ学习系列(五): RPC 远程过程调用

    前面讲过一些RabbitMQ的安装和用法,也说了说RabbitMQ在一般的业务场景下如何使用。不知道的可以看我前面的博客,http://www.cnblogs.com/zhangweizhong/category/855479.html

    不过,最近有朋友问我,RabbitMQ RPC 是干嘛的,有什么用。

    其实,RabbitMQ RPC 就是通过消息队列(Message Queue)来实现rpc的功能,就是,客户端向服务端发送定义好的Queue消息,其中携带的消息就应该是服务端将要调用的方法的参数 ,并使用Propertis告诉服务端将结果返回到指定的Queue

    1.RabbitMQ RPC的特点

    • Message Queue把所有的请求消息存储起来,然后处理,和客户端解耦。
    • Message Queue引入新的结点,系统的可靠性会受Message Queue结点的影响。
    • Message Queue是异步单向的消息。发送消息设计成是不需要等待消息处理的完成。

    所以对于有同步返回需求,Message Queue是个不错的方向。

    2.普通PRC的特点

    • 同步调用,对于要等待返回结果/处理结果的场景,RPC是可以非常自然直觉的使用方式。当然RPC也可以是异步调用。
    • 由于等待结果,客户端会有线程消耗。

    如果以异步RPC的方式使用,客户端线程消耗可以去掉。但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

    3.适用场合说明

    • 希望同步得到结果的场合,RPC合适。
    • 希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模拟本地调用。异步的方式编程比较复杂。
    • 不希望客户端受限于服务端的速度等,可以使用Message Queue。

    4.RabbitMQ RPC工作流程:

     

    基本概念:

    Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

    Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

    流程说明

    • 当客户端启动的时候,它创建一个匿名独享的回调队列。
    • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
    • 将请求发送到一个 rpc_queue 队列中。
    • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
    • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

     5.完整代码:

      1. 创建两个控制台程序,作为RPC Server和RPC Client, 引用 RabbitMQ.Client,

      2. RPC Server

        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue: "rpc_queue",
                                         noAck: false,
                                         consumer: consumer);
                    Console.WriteLine(" [x] Awaiting RPC requests");
    
                    while (true)
                    {
                        string response = null;
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    
                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;
    
                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            int n = int.Parse(message);
                            Console.WriteLine(" [.] fib({0})", message);
                            response = fib(n).ToString();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "",
                                                 routingKey: props.ReplyTo,
                                                 basicProperties: replyProps,
                                                 body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                                             multiple: false);
                        }
                    }
                }
            }
    
            /// <summary>
            /// Assumes only valid positive integer input.
            /// Don't expect this one to work for big numbers,
            /// and it's probably the slowest recursive implementation possible.
            /// </summary>
            private static int fib(int n)
            {
                if (n == 0 || n == 1)
                {
                    return n;
                }
    
                Thread.Sleep(1000 * 10);
    
                return n;
            }
        }

      3. RPC Client

        class Program
        {
            static void Main(string[] args)
            {
                for (int i = 0; i < 10; i++)
                {
                    Stopwatch watch = new Stopwatch();
    
                    watch.Start();
    
                    var rpcClient = new RPCClient();
    
                    Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));
    
                    var response = rpcClient.Call(i.ToString());
    
                    Console.WriteLine(" [.] Got '{0}'", response);
    
                    rpcClient.Close();
    
                    watch.Stop();
    
                    Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
                }
    
                Console.WriteLine(" complete!!!! ");
    
    
                Console.ReadLine();
            }
        }
    
        class RPCClient
        {
            private IConnection connection;
            private IModel channel;
            private string replyQueueName;
            private QueueingBasicConsumer consumer;
    
            public RPCClient()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "OrderQueue", UserName = "zhangweizhong", Password = "weizhong1988", Port = 5672 };
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: replyQueueName,
                                     noAck: true,
                                     consumer: consumer);
            }
    
            public string Call(string message)
            {
                var corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                props.ReplyTo = replyQueueName;
                props.CorrelationId = corrId;
    
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "",
                                     routingKey: "rpc_queue",
                                     basicProperties: props,
                                     body: messageBytes);
    
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == corrId)
                    {
                        return Encoding.UTF8.GetString(ea.Body);
                    }
                }
            }
    
            public void Close()
            {
                connection.Close();
            }
        }

      4.分别运行Server和Client

    6.最后

      1.参照RabbitMQ官方教程的RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

      2.本文源代码下载,http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

      3.博客原地址:http://fpeach.com/post/2016/12/01/RabbitMQ%E5%AD%A6%E4%B9%A0%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-RPC-%E8%BF%9C%E7%A8%8B%E8%BF%87%E7%A8%8B%E8%B0%83%E7%94%A8.aspx

  • 相关阅读:
    sqlplus时报Linux-x86_64 Error: 13: Permission denied
    thrift之TTransport层的缓存传输类TBufferedTransport和缓冲基类TBufferBase
    Java实现 蓝桥杯 算法提高 新建Microsoft world文档
    Java实现 蓝桥杯 算法提高 新建Microsoft world文档
    Java实现 蓝桥杯 算法提高 快乐司机
    Java实现 蓝桥杯 算法提高 快乐司机
    Java实现 蓝桥杯 算法提高 队列操作
    Java实现 蓝桥杯 算法提高 队列操作
    Java实现 蓝桥杯 算法提高 文本加密
    Java实现 蓝桥杯 算法提高 合并石子
  • 原文地址:https://www.cnblogs.com/zhangweizhong/p/6117371.html
Copyright © 2011-2022 走看看