zoukankan      html  css  js  c++  java
  • RabbitMQ 实现RPC

    NMIV$QS2F)`R0Y38S03W}DU

    实现RPC

    首先要弄明白,RPC是个什么东西。

    (RPC) Remote Procedure Call Protocol 远程过程调用协议

    在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器。但是在做开发时候往往要用到其它团队的方法,因为已经有了实现。但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效。RPC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及现在火热的WebApi。

    在RabbitMQ中RPC的实现也是很简单高效的,现在我们的客户端、服务端都是消息发布者与消息接收者。

    首先客户端通过RPC向服务端发出请求

    我这里有一堆东西需要你给我处理一下,correlation_id:这是我的请求标识,erply_to:你处理完过后把结果返回到这个队列中。

    服务端拿到了请求,开始处理并返回

    correlation_id:这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlation_id与服务端返回的id进行对比。是我的,就接收。

    image

    一些繁琐的细节rabbitmq已经为我们封装了,简单的SimpleRpcServer与SimpleRpcClient让Rpc实现的更为方便,这里可以先看一下server端

    使用默认的交换机,创建一个SimpleRpcServer的实例,这里需要注意的是,SimpleRpcServer的处理应该是根据业务来的,也就是自己的。在给出的类中没有任何的实现,如果我们创建一个自己的RpcServer并且给出实现

              //创建返回一个新的频道
                using (var channel = RabbitMqHelper.GetConnection().CreateModel())
                {
    
                    //创建一个rpc queue
                    channel.QueueDeclare("RpcQueue", true, false, false, null);
    
                    SimpleRpcServer rpc = new SmsSimpleRpcServer(new Subscription(channel, "RpcQueue"));
    
                    Console.WriteLine("服务端启动成功");
    
       rpc.MainLoop();
                    Console.ReadKey();
    
                }

    这里是自己的一个RpcServer,在HandleSimpleCall方法里返回对处理的回调消息,在ProcessRequest中做出具体的处理逻辑

    /// <summary>
        /// 发送短信的Rpc
        /// </summary>
        public class SmsSimpleRpcServer : SimpleRpcServer
        {
            public SmsSimpleRpcServer(Subscription subscription) : base(subscription)
            {
            }
    
            /// <summary>
            /// 执行完成后进行h回调
            /// </summary>
            /// <param name="isRedelivered"></param>
            /// <param name="requestProperties"></param>
            /// <param name="body"></param>
            /// <param name="replyProperties"></param>
            /// <returns></returns>
            public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
            {
                replyProperties = null;
                return Encoding.UTF8.GetBytes($"给{Encoding.UTF8.GetString(body)}发送短信成功");
            }
    
    
            /// <summary>
            /// 进行处理
            /// </summary>
            /// <param name="evt"></param>
            public override void ProcessRequest(BasicDeliverEventArgs evt)
            {
                // todo.....
                base.ProcessRequest(evt);
            }
        }

    回到client端,这里的代码也是非常容易的。创建一个SimpleRpcClient,然后指定了交换机类型,因为用的是默认的,所以exchange传的是null, routingkey是我们的rpcqueue。最后调用call方法

    using (var channel = RabbitMqHelper.GetConnection().CreateModel())
               {
                   //创建client的rpc
                   SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue"));
                   bool flag = true;
                   var sendmsg = "";
                   while (flag)
                   {
                       Console.WriteLine("请输入要发送的消息");
                       sendmsg = Console.ReadLine();
                       if (string.IsNullOrWhiteSpace(sendmsg))
                       {
                           Console.Write("请输入消息");
                           continue;
                       }
    
                       var msg = client.Call(Encoding.UTF8.GetBytes(sendmsg));
    
                       Console.WriteLine(Encoding.UTF8.GetString(msg));
    
    
                   }
    
                   Console.ReadKey();
    
               }

    把程序运行起来

    image

    后面说一些内部的东西,其实上在创建一次SimpleRpcClient的时候都会创建一个回调队列,这个队列在程序关闭后会自动消失,所以这些建议创建一次就够了,都使用这个。如果创建多次会影响性能

    image

    image

    在回调的时候,通过源码也可以看到判断了correlation_id的一致性

    NMIV$QS2F)`R0Y38S03W}DU[4]

    在server端也可以看到在执行Process后会发布消息到回调队列

    image

  • 相关阅读:
    【算法微解读】浅谈01分数规划
    【算法微解读】浅谈线段树
    近期目标
    【洛谷P5008 逛庭院】tarjan缩点+贪心
    【洛谷P1061 Jam的计数法】搜索
    【洛谷P1140 相似基因】动态规划
    【建兰普及模拟赛四】20181026
    【建兰普及模拟赛第三场】20181035
    【洛谷P2800又上锁妖塔】动态规划
    【建兰普及模拟赛第二场】20181024
  • 原文地址:https://www.cnblogs.com/LiangSW/p/6216537.html
Copyright © 2011-2022 走看看