zoukankan      html  css  js  c++  java
  • rebbitmq-RPC(C#)

    RPC(Remote Procedure Call Protocol)——远程过程调用协议

    运行时,一次客户机对服务器的RPC调用,其内部操作大致有如下十步:
    1.调用客户端句柄;执行传送参数
    2.调用本地系统内核发送网络消息
    4.服务器句柄得到消息并取得参数
    5.执行远程过程
     
    6.执行的过程将结果返回服务器句柄
    7.服务器句柄返回结果,调用远程系统内核
    8.消息传回本地主机
    9.客户句柄由内核接收消息
    10.客户接收句柄返回的数据
     
    rebbitmq通过消息交互来实现远程方法的调用,
    主要过程包括两端:
    被调用端:
     1             //创建链接
     2             var factory = new ConnectionFactory() { HostName = "localhost" };
     3             using (var connection = factory.CreateConnection())
     4             {
     5                 //创建渠道
     6                 using (var channel = connection.CreateModel())
     7                 {
     8                     //创建消息队列,接收rpc请求
     9                     channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
    10 
    11                     //监听并消费消息,收到消息需要通知发送方
    12                     var consumer = new QueueingBasicConsumer(channel);
    13                     channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer);
    14                     while (true)
    15                     {
    16                         Thread.Sleep(1000);
    17                         Console.WriteLine(string.Format("等待RPC请求..."));
    18                         string response = null;
    19                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    20                         var body = ea.Body;
    21                         var props = ea.BasicProperties;
    22                         var replyProps = channel.CreateBasicProperties();
    23                         //约定ID
    24                         replyProps.CorrelationId = props.CorrelationId;
    25                         var message = Encoding.UTF8.GetString(body);
    26                         try
    27                         {
    28                             Console.WriteLine(string.Format("执行方法{0}", message));
    29                             response = Request(message);
    30                         }
    31                         catch (Exception ex)
    32                         {
    33                             Console.WriteLine(string.Format("执行{0}异常,异常信息:{1}", message, ex.Message));
    34                             response = "";
    35                         }
    36                         finally
    37                         {
    38                             var responseBytes = Encoding.UTF8.GetBytes(response);
    39                             //按照发送方的要求(等待返回的接收队列、约定ID),返回消息
    40                             channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes);
    41                             //收到消息需要通知发送方
    42                             channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    43                         }
    44                     }
    45                 }
    46             }

    调用端:

     1             //创建链接
     2             var factory = new ConnectionFactory() { HostName = "localhost" };
     3             using (var connection = factory.CreateConnection())
     4             {
     5                 //创建渠道
     6                 using (var channel = connection.CreateModel())
     7                 {
     8                     //创建接收返回消息的队列
     9                     string replyQueueName = channel.QueueDeclare().QueueName;
    10 
    11                     //监听并消费消息,收到消息无需通知发送方
    12                     var consumer = new QueueingBasicConsumer(channel);
    13                     channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer);
    14 
    15                     var props = channel.CreateBasicProperties();
    16                     props.ReplyTo = replyQueueName;
    17                     var corrId= Guid.NewGuid().ToString();
    18                     props.CorrelationId =corrId;
    19 
    20                     var messageByte = Encoding.UTF8.GetBytes(message);
    21                     //发送消息,并携带接收返回消息的相关内容(接收队列、约定ID)
    22                     channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageByte);
    23 
    24                     while (true)
    25                     {
    26                         Thread.Sleep(1000);
    27                         Console.WriteLine(string.Format("等待请求requst:{0}返回...", message));
    28                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    29                         if (ea.BasicProperties.CorrelationId==corrId)
    30                         {
    31                             Console.WriteLine(string.Format("请求requst:{0};response:{1}", message, Encoding.UTF8.GetString(ea.Body)));
    32                             break;
    33                         }
    34                     }
    35                 }
    36             }
  • 相关阅读:
    OOP 三大特点:继承性,封装性,多态性
    PHP 知识点
    ELK安装和配置及常用插件安装
    istio1.2.2 安装及使用示例
    动态扩展磁盘(LVM)
    kuberadm集群升级
    nginx+nginx-upsync-module实现配置动态更新
    kubernetes资源优化
    ingress controller 和ingress使用实例
    helm安装及使用
  • 原文地址:https://www.cnblogs.com/lzzhang/p/4801452.html
Copyright © 2011-2022 走看看