zoukankan      html  css  js  c++  java
  • 谈谈RPC中的异步调用设计

    RPC(远过程调用)在分布式系统中是很常用的基础通讯手段,核心思想是将不同进程之间的通讯抽象为函数调用,基本的过程是调用端通过将参数序列化到流中并发送给服务端,服务端从流中反序列化出参数并完成实际的处理,然后将结果序列化后返回给调用端。通常的RPC由接口形式来定义,接口定义服务的名字,接口方法定义每个请求的输入参数和返回结果。RPC内部的序列化、网络通讯等实现细节则由框架来完成,对用户来说是完全透明的。之前我使用.net开发过一套轻量级的分布式框架(PPT看这里,视频看这里),这套框架经过2年多的持续开发和改进已经运用到数款产品中(包括网络游戏和分布式应用),取得了不错的效果,等未来框架成熟后会考虑开源,本文讨论的RPC基于这套框架展开。

    通常我们的函数调用都是同步的,也就是调用方在发起请求后就能得到结果(成功返回结果失败则抛出异常),中间不能去干其他事情,与这种模式对应的RPC称之为请求-响应式模式。请求-响应式的优点在于时序清晰,逻辑简单,和普通的函数调用完全等价。比如我们可以这样定义RPC接口:

    1 [Protocol(ID=1)]
    2 public interface ICalculate
    3 {
    4     [DispId(1)]
    5     int Add(int p1, int p2);
    6 }

    客户端就可以像这样使用接口:

    1 var calculate = new ICalculateProxy();//ICalculateProxy为框架生成的接口代理类
    2 calculate.Connect(url);
    3 var result = calculate.Add(1, 2);

     但是在分布式中这种模式的缺点也是非常的明显,第一个问题是网络通讯的延迟会严重的制约请求-响应式RPC的响应速度,使得请求吞吐量无法满足性能需要,大量的CPU时间会阻塞在等待请求的响应上;第二个问题是请求-响应式只有由客户端向服务端发起请求,服务端不能主动向客户端发送事件通知,也就是缺乏一种callback机制。

    针对请求-响应式的缺点我们可以用双向通讯机制来改进,首先去掉请求的返回值,所有的方法请求都定义为无返回结果,这样调用方在发出请求之后就可以继续干后面的事情了,而不需要再等待服务返回结果。同时针对服务接口定义一个Callback接口用于服务端向客户端发送请求结果和事件通知,这样服务器就可以主动向客户端发送消息了。这种RPC模式可以称之为双向会话式,接口可以这样定义:

     1 [Protocol(ID=1), Callback(typeof(ICalculateCallback))]
     2 public interface ICalculate
     3 {
     4     [DispId(1)]
     5     void Add(int p1, int p2);
     6 }
     7 
     8 public interface ICalculateCallback : IServiceCallback
     9 {
    10     [DispId(1)]
    11     void OnAdd(int result);
    12 }

     服务端可以这样实现服务接口:

    1 public class CaculateService : ICaculateImpl //这里ICaculateImpl为框架生成的服务实现接口
    2 {
    3     ICaculateImpl.Add(Session session, int p1, int p2)
    4     {
    5          var result = p1 + p2;
    6          session.Cllback.OnAdd(result);
    7     }
    8 }

    双向会话式解决了请求的异步处理以及服务器的双向通讯问题,但是也给调用者带来了一些不便,例如上例中如果调用方发起多个Add请求,在收到OnAdd消息后如何将结果与请求关联起来呢?一种解决方案是在Add请求中多加一个request id参数,服务器在处理完Add之后将request id放到OnAdd方法中和结果一起传给客户端,客户端根据request id来关联请求与结果。这种手工处理的方式代码写起来很麻烦,那么有没有一种更好的RPC模式来解决这个问题呢?这就是下面给大家介绍的支持异步调用的RPC设计。

    异步调用的主要设计思想是在双向会话式的基础上让调用方通过一个回调函数来获得请求的结果,而不再通过Callback接口来获得结果。采用回调函数的好处在于我们可以使用闭包来隐式的关联请求和响应之间的上下文,这样就不需要显式的传递request id来手工关联上下文了。并且服务器仍然可以通过Callback接口向客户端主动发送消息,保留了原来双向通讯的优点。但是需要注意的是由于请求在服务器上可能是异步执行的,所以服务器不保证请求的响应是按顺序返回的,这可能造成一些隐含的乱序问题,需要客户端在调用时特别注意。如果响应需要严格的按照请求顺序返回客户端,那么服务端需要同步处理请求,或者引入队列机制对异步的响应进行排队有序返回响应。

    之前的ICalculate就可以这样定义:

    [Protocol(ID=1), Callback(typeof(ICalculateCallback))]
     public interface ICalculate
     {
         [DispId(1), Async]
         void Add(int p1, int p2, Action<int> OnResult, Action<int,string> OnError = null);
     }

    用Async这个标签表示这个请求为异步请求,调用者用OnResult回调函数来接收请求的结果,OnError则为返回错误的回调函数,如果调用者不关心错误返回,那么可以不传递OnError,而在IServiceCallback的OnError方法中接收错误信息。
    调用者可以很方便的使用闭包来处理结果,同时隐藏异步的实现细节,像这样:

    1 void TestAdd(ICalculateProxy calculate, int p1, int p2)
    2 {
    3     calculate.Add(p1, p2, result => MessageBox.Show(string.Format("{0} + {1} = {2}", p1, p2, result), (errCode, errMsg) => MessageBox.Show("Add failed:" + errMsg));
    4 }

    服务器端的实现是这样的:

     1 public class CaculateService : ICaculateImpl
     2 {
     3     ICaculateImpl.Add(Session session, int p1, int p2, ICaculate_AddAsyncReply reply)
     4     {
     5         try
     6         {
     7             var result = p1 + p2;
     8             reply.OnResult(result);
     9         }
    10         catch(OverflowException e)
    11         {
    12             reply.OnError(-1, e.Message);
    13         }
    14     }
    15 }

    ICaculate_AddAsyncReply为框架生成的返回异步结果的对象,有一个OnResult和一个OnError方法。有了这个reply对象之后,服务器的请求处理也可以实现异步处理,客户端请求不需要在请求函数里一次完成,而是可以放到其他线程或者异步方法中处理,稍后在通过reply向客户端返回结果。

    下面我们来看看框架在背后为我们做的一些实现细节,首先是客户端的Proxy:

     1 //在Proxy中使用一个RequestContext结构保存请求的上下文信息,上下文中记录某个请求的唯一id,在调用时一起发送到服务器:
     2 struct RequestContext
     3 {
     4     public int reqId;
     5     public Delegate OnResult;
     6     public Action<int, string> OnError;
     7 
     8     public RequestContext(int id, Delegate onResult, Action<int, string> onError)
     9     {
    10         reqId = id;
    11         OnResult = onResult;
    12         OnError = onError;
    13     }
    14 }
    15 
    16 //服务器返回响应之后proxy就找出reqId对应的请求上下文,然后调用对应的回调函数传递结果
    17 void OnAddReply(BinaryStreamReader __reader) 18 { 19 int reqId; 20 int ret; 21 __reader.Read(out reqId); 22 __reader.Read(out ret); 23 if(ret == 0) 24 { 25 int p0; 26 __reader.Read(out p0); 27 RequestContext ctx = PopAsyncRequest(reqId); 28 var __onResult = ctx.OnResult as Action<int>; 29 __onResult(p0); 30 } 31 else 32 { 33 RequestContext ctx = PopAsyncRequest(reqId); 34 string msg; 35 __reader.Read(out msg); 36 if(ctx.OnError != null) 37 ctx.OnError(ret, msg); 38 else 39 _handler.OnError(ret, msg); 40 } 41 }

    服务端的一些实现细节:

     1 //框架生成请求对应的异步响应类
     2 public class ICaculate_AddAsyncReply  : AsyncReply
     3 {
     4     public ICaculate_AddAsyncReply(int reqId, Connection conn)
     5     {
     6         _reqId = reqId;
     7         _connection = conn;
     8     }
     9 
    10     public void OnError(int error, string msg)
    11     {
    12         var stream = new BinaryStreamWriter();
    13         stream.Write(1);
    14         stream.Write(_reqId);
    15         stream.Write(error);
    16         stream.Write(msg);
    17         _connection.Write(stream.BuildSendBuffer());
    18     }
    19     public void OnResult(int result)
    20     {
    21         var stream = new BinaryStreamWriter();
    22         stream.Write(1);
    23         stream.Write(_reqId);
    24         stream.Write(0);
    25         stream.Write(result);
    26         _connection.Write(stream.BuildSendBuffer());
    27     }
    28 }

    框架生成的Stub类将收到的请求数据进行解析然后调用具体服务类来处理请求:

     1 void AddInvoke(ICaculateImpl __service, Session __client, BinaryStreamReader __reader)
     2 {
     3     int p1;
     4     int p2;
     5     int __reqId;
     6     __reader.Read(out __reqId);
     7     __reader.Read(out p1);
     8     __reader.Read(out p2);
     9     var reply = new ICaculate_AddAsyncReply(__reqId, __client.Connection);
    10     try
    11     {
    12         __service.Add(__client, p1, p2, reply);
    13     }
    14     catch(ServiceException e)
    15     {
    16         reply.OnError(e.ErrCode, e.Message);
    17         Log.Info("Service Invoke Failed. clientId:{0} error message:{1}", __client.ID, e.Message);
    18     }
    19     catch(Exception e)
    20     {
    21         reply.OnError((int)ServiceErrorCode.Generic, "generic service error.");
    22         Log.Error("Generic Service Invoke Failed, clientId:{0} error message:{1}
    Call Stack: {2}", __client.ID, e.Message, e.StackTrace);
    23     }
    24 }


    由于完整的框架代码比较庞大,所以上面只贴了关键部分的实现细节。从实现细节我们可以看到,框架实际上也是通过request id来关联请求和响应函数之间的上下文的,但是通过代码生成机制隐藏了实现的细节,给使用者提供了一种优雅的抽象。

    总结:在双向会话式的RPC基础上,引入了一种新的异步请求调用模式,让调用者可以通过闭包来方便的异步处理请求的响应结果,同时服务器端的请求处理也可以实现异步处理。

  • 相关阅读:
    SseEmitter broken pipe
    沉淀vue相关知识(主要还是个人积累用)
    简述箭头函数基本使用和this指向的问题
    前端渲染和后端渲染,前端路由和后端路由
    .net工程师学习vue的心路历程(三)
    .net工程师学习vue的心路历程(一)
    初始mongodb(一)
    .net工程师学习vue的心路历程(二)
    MySQL自我保护参数
    mongodb副本集添加节点
  • 原文地址:https://www.cnblogs.com/Hybird3D/p/3346582.html
Copyright © 2011-2022 走看看