zoukankan      html  css  js  c++  java
  • rabbitmq (五)RPC

    Remote Procedure Call or RPC(远程函数调用)

    当我们需要在远程计算机上运行一个函数,并且等待结果的时候,我们用到RPC

    在rabbitmq客户端使用call函数,发送RPC请求并阻塞等待结果返回.

    提示:虽然RPC是一个很好计算处理的常见模式,但是有时程序员无法判断

    一个函数调用时一个本地调用还是一个缓慢的RPC调用.所以有很多错误的不可预知的结果.

    并且增加调试的复杂性.

    有三个建议:

    1.确定函数调用时本地还是远程调用.

    2.给系统添加文档,确定各个组件之间的依赖

    3.处理每个RPC调用过程中的异常,

    callback queue

    因为通常订阅/发布是一个单向过程,只需要一个队列,但是在RPC调用的时候需要制定两个队列,一个发送,一个接受结果.所以用ReplyTo属性指定接受结果队列名称.

    correlation Id

    为每一个RPC调用创建一个callback queue是低效的,所以可以使用一个公用的callback queue使用关联id来标识每个请求.

    源码:

    客户端

    public class RpcClient
        {
            private readonly IConnection connection;
            private readonly IModel channel;
            private readonly string replyQueueName;
            private readonly EventingBasicConsumer consumer;
            private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
            private readonly IBasicProperties props;
    
            public RpcClient()
            {
                var factory = new ConnectionFactory() { HostName = "your ip", UserName = "your name", Password = "your pwd" };
    
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new EventingBasicConsumer(channel);
    
                props = channel.CreateBasicProperties();
                var correlationId = Guid.NewGuid().ToString();
                props.CorrelationId = correlationId;
                props.ReplyTo = replyQueueName;
    
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var response = Encoding.UTF8.GetString(body);
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        respQueue.Add(response);
                    }
                };
            }
    
            public string Call(string message)
            {
                var messageBytes = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(
                    exchange: "",
                    routingKey: "rpc_queue",
                    basicProperties: props,
                    body: messageBytes);
    
                channel.BasicConsume(
                    consumer: consumer,
                    queue: replyQueueName,
                    autoAck: true);
    
                return respQueue.Take(); ;
            }
    
            public void Close()
            {
                connection.Close();
            }
        }
        class Program
        {
            static void Main(string[] args)
            {
                var rpcClient = new RpcClient();
    
                Console.WriteLine(" [x] Requesting fib(30)");
                var response = rpcClient.Call("30");
    
                Console.WriteLine(" [.] Got '{0}'", response);
                rpcClient.Close();
            }
        }

    服务端:

    public static void Main()
            {
                var factory = new ConnectionFactory() { HostName = "your ip", UserName = "your name", Password = "your pwd" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "rpc_queue", durable: false,
                      exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: "rpc_queue",
                      autoAck: false, consumer: consumer);
                    Console.WriteLine(" [x] Awaiting RPC requests");
    
                    consumer.Received += (model, ea) =>
                    {
                        string response = null;
    
                        var body = ea.Body;
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;
    
                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            int n = int.Parse(message);
                            Console.WriteLine(" [.] fib({0})", message);
                            response = fib(n).ToString();
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(" [.] " + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                              multiple: false);
                        }
                    };
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
    
            /// 
    
            /// Assumes only valid positive integer input.
            /// Don't expect this one to work for big numbers, and it's
            /// probably the slowest recursive implementation possible.
            /// 
    
            private static int fib(int n)
            {
                if (n == 0 || n == 1)
                {
                    return n;
                }
    
                return fib(n - 1) + fib(n - 2);
            }
  • 相关阅读:
    jquery操作select(取值,设置选中)
    jQuery懒加载插件 – jquery.lazyload.js简单调用
    js获取URL中的参数
    数据结构之队列C++版
    数据结构之堆栈java版
    数据结构之堆栈C++版
    c++操作符重载
    QT状态机
    c++/c关于函数指针
    学习Qt的一点小感想
  • 原文地址:https://www.cnblogs.com/weichao975/p/8203379.html
Copyright © 2011-2022 走看看