zoukankan      html  css  js  c++  java
  • RabbitMQ九:远程过程调用RPC

    定义

    RPC(Remote Procedure Call Protocol)——远程过程调用协议:它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。

    PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息,在服务器,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复消息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续前进。(整个过程有点类似:你到某大医院看病,你先到柜台交钱拿卡(医师费),拿卡去找医生(卡代表你的认证相当参数),医生根据卡给你把脉看病进行详谈沟通,医诊结束后给你开药,下一位患者进入。。。。。,医生有是服务端,患者是客户端,举例可能有点牵强,就是表达那个意思,)

    RPC是在计算机中一种常见的模式,是通常我要用消息队列3个关键点:

    1、服务的寻址;
    
    2、消息的接受;
    
    3、消息的关联。

    RPC调用的顺序简述:

     

     1、当客户端启动时,它会创建一个匿名的独占会回调队列;
    
     2、对于一个RPC请求,客户端通过两个属性发送一条消息(从图中我们也可以看到):relayTo 设置回调队列;correlationId,为每个请求设置唯一的标识ID;
    
     3、消息将发送到一个Rpc_queue  队列;
    
     4、RPC工程线程(服务器)在该队列上等待请求,当请求出现,他将处理请求并把结果发回到客户端,使用队列在replayTo中设置;
    
     5、客户端在回调队列上等待响应,当消息出现,它检查关联ID,如果匹配来自请求的关联ID值,返回队列消息到该应用程序。

    重点解释

    correlationId 和 relayTo 参数

    首先客户端通过RPC向服务端发送请求

    我这里有一堆东西需要你给我处理一下,correlationId :这是我的请求标识,relayTo :你处理完过后把结果返回到这个队列中。

    服务端拿到请求,并开始处理并返回结果

    correlationId :这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlationId 与服务端返回的id进行对比。是我的,就接收。

    适合RPC场合说明

    希望同步得到数据的场合,RPC合适;
    
    希望使用简单,则RPC;RPC操作基于接口,使用简单,使用方式模式本地调用。异步的方式编程比较复杂。
    
    不希望客户端受限于服务端的速度等,可以使用Message Queue

    RabbitMQ RPC的特点

    Message Queue  把所有的请求消息存储起来,然后处理,和客户端解耦;
    
    Message Queue  引用新的结点,系统的可靠性会受Message Queue 结点的影响;
    
    Meaage  Queue 是异步单向的消息,发送消息设计成是不需要等待消息处理的完成。
    
    所以对于有同步返回需求,Message Queue 是个不错的方向

    普通RPC的特点

    同步调用,对于要等待返回结果、处理结果的场景,RPC是可以非常自然直觉的使用方式,当然RPC也可以异步调用。
    
    由于等待结果,客户端会有线程消耗。
    
    如果以异步RPC的方式使用,客户端线程消耗可以去掉,但不能做到像消息一样暂存消息请求,压力会直接传导到服务端。

    代码块

    备注(创建两个解决方案:服务端和客户端)

     服务端

     static void Main(string[] args)
            {
                using (var channel = GetConnection().CreateModel())
                {
                    channel.QueueDeclare("rpc_queue", true, false, false, null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    // var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("rpc_queue", false, consumer);
                    Console.WriteLine("等待 RPC 队列");
                    consumer.Received += (model, ea) =>
                    {
                        // while (true)
                        // {
                        string response = null;
                        //出列
                        // var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var props = ea.BasicProperties;
    //内容的基本属性
    var replyProps = channel.CreateBasicProperties();
    //注意这里的correlationId replyProps.CorrelationId
    = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine("显示内容" + message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine("报错" + e.ToString()); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes); channel.BasicAck(ea.DeliveryTag, false); } // }; }; Console.WriteLine("发布成功!!!"); Console.ReadLine(); } } /// <summary> /// 私有方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int fib(int n) { if (n == 0 || n == 1) { return n; } //Thread.Sleep()方法用于将当前线程休眠一定时间 时间单位是毫秒 1000毫秒= 1秒 //System.Threading.Thread.Sleep(2000);当前休眠2秒 //suspen()挂起当前线程。也可以指定挂起时间。 //close() 关闭当前线程。 Thread.Sleep(100 * 10); return n; // return fib(n - 1) + fib(n - 2); }

    客户端(两个类:Consumer,HelpConnection)

    Consumer代码块:

      static void Main(string[] args)
            {
                for (int i = 0; i < 30; i++)
                {
                    Stopwatch watch = new Stopwatch();
                    watch.Start();
                    var rpcClient = new HelpConnection();
                    Console.WriteLine("显示内容" + i.ToString());
                    var response = rpcClient.Call(i);
                    Console.WriteLine("显示内容" + response);
                    //当前连接关闭
                    rpcClient.Close();
                    watch.Stop();
                    Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
                }
                Console.WriteLine(" complete!!!! ");
                Console.ReadLine();
            }

    HelpConnection代码块:

            /// <summary>
            /// 成员变量
            /// </summary>
            private static IConnection connection { get; set; }
            private IModel channel { get; set; }
            private string replyQueueName { get; set; }
            private QueueingBasicConsumer consumer { get; set; }
    
            /// <summary>
            /// 构造方法:连接配置
            /// </summary>
            public HelpConnection()
            {
                var factory = new ConnectionFactory()
                {
                    //计算机名称,账号,密码,
                    HostName = "localhost",
                    UserName = "zhangguangpo",
                    Password = "guangpo1992",
                    //RequestedHeartbeat = 60,
                    AutomaticRecoveryEnabled = true   //要启用自动连接恢复
                };
                //创建连接
                connection = factory.CreateConnection();
                channel = connection.CreateModel();
                //而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to
                replyQueueName = channel.QueueDeclare().QueueName;
                consumer = new QueueingBasicConsumer(channel);
                channel.BasicConsume(queue: replyQueueName,
                                     noAck: true,
                                     consumer: consumer);
                //  return Connection;
            }
    
            /// <summary>
            /// 消息判断
            /// </summary>
            /// <param name="message"></param>
            /// <returns></returns>
            public string Call(int message)
            {
                var corrId = Guid.NewGuid().ToString();
                var props = channel.CreateBasicProperties();
                props.ReplyTo = replyQueueName;
                props.CorrelationId = corrId;
                var messageBates = Encoding.UTF8.GetBytes(message.ToString());
                channel.BasicPublish("", "rpc_queue", props, messageBates);
                while (true)
                {
                    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == corrId)
                    {
                        var body = Encoding.UTF8.GetString(ea.Body);
                        return body;
                    }
                }
            }
    
            /// <summary>
            /// 当前连接关闭
            /// </summary>
            public void Close()
            {
                connection.Close();
            }

    效果图

     

    • 博主是利用读书、参考、引用、抄袭、复制和粘贴等多种方式打造成自己的纯镀 24k 文章,请原谅博主成为一个无耻的文档搬运工!
    • 小弟刚迈入博客编写,文中如有不对,欢迎用板砖扶正,希望给你有所帮助。
  • 相关阅读:
    new delate he typedef的含义
    Importing the multiarray numpy extension module failed
    QT socket相关
    CMake的一些使用
    CMake undefined reference to `QTcpServer::QTcpServer(QObject*)'的解决
    MFC操作excel
    dsview
    phyton 相关学习
    面试相关
    远程连接
  • 原文地址:https://www.cnblogs.com/lrzr/p/7397132.html
Copyright © 2011-2022 走看看