zoukankan      html  css  js  c++  java
  • dubbo源码—service invoke

    原地址:http://www.cnblogs.com/sunshine-2015/p/8325992.html

    dubbo的远程调用过程是怎么样的?

    dubbo远程过程调用经过了那些处理?

    发起远程调用的时候究竟传了什么数据给provider?

    要解决这些问题,欢迎一起探讨走进dubbo源码栏目。

    在service reference中说了consumer端发起调用的时候使用的是远程服务的本地代理,发起调用的堆栈是

    (上面调用堆栈中的filter链先不介绍了,留在后面service reply中介绍,因为consumer和provider的filter链构造过程是类似的)

    前面说过consumer在引用服务的时候最终会生成一个proxy,该proxy是实现了对应的服务接口(比如:com.test.service.TestDubboService),而且包含一个InvokerInvocationHandler属性,在proxy的服务接口方法中调用InvokerInvocationHandler.invoke

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      String methodName = method.getName();
      Class<?>[] parameterTypes = method.getParameterTypes();
      // 判断是否是Object的方法,如果是则直接执行本地调用,不发起远程调用
      if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
      }
      // toString、hashCode、equals执行本地调用
      if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
      }
      if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
      }
      if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
      }
    
      // RpcInvocation是一个很重要的类,是真正在consumer和provider之间通过网络传输的实体,里面包含了服务调用需要的必要信息
      RpcInvocation invocation = new RpcInvocation(method, args);
      Result r = null;
      try {
        r = invoker.invoke(invocation);
        Object o = r.recreate();
        return o;
      } catch (Throwable e) {
        // ... 省略中间代码
      }
    }
    
    public class RpcInvocation implements Invocation, Serializable {
    
        private static final long serialVersionUID = -4355285085441097045L;
        // 调用的方法全限定名
        private String               methodName;
        // 方法的参数类型
        private Class<?>[]           parameterTypes;
        // 入参
        private Object[]             arguments;
        // 附件,可以传递一些其他信息
        private Map<String, String>  attachments;
        // invoker
        private transient Invoker<?> invoker;
        // ... 省略中间代码
    }

    上面通过invoke调用发起invoker的调动链,依次调用invoker.invoke方法

    MockClusterInvoker#invoke

    invoke方法里面会判断是否配置了mock参数

    1. 如果没有配置则直接继续调用invoke
    2. 如果配置的值以force开头,表明直接mock调用
    3. 如果mock配置了但是不以force开头,要先试试正常调用,如果正常调用失败了才会使用mock

    AbstractClusterInvoker#invoke

    这里继续正常情况下的非mock调用。其实上面接下来调用的是FailbackClusterInvoker#invoke,但是FailbackClusterInvoker继承了AbstractClusterInvoker,而且FailbackClusterInvoker没有实现invoke方法,所以直接调用了超类的invoke方法

    1. 判断当前Invoker是否被销毁,如果销毁直接抛出异常
    2. 然后调用directory.list找出所有的invoker
    3. 通过SPI加载负载均衡的扩展
    4. 调用实现类的doInvoke

    FailbackClusterInvoker#doInvoke

    这个类是负责dubbo调用失败重试的类

    1. 首先检查是否有可以调用的invoker
    2. 调用AbstractClusterInvoker.select,通过路由到一个指定invoker
    3. 通过路由到的invoker发起调用
    4. 如果调用抛出异常了,将请求加入失败列表然后定时重试

    上面invoker链调动完之后,会调用dubbo的filter链

    // dubbo有一个调用上下文RpcContext,这个filter就是负责往context写信息
    com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
    // dubbo支持事件通知(oninvoke,onreturn,onthrow),通过该filter实现,
    com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
    // 负责调用过程的监控,调用耗时、并发数等
    com.alibaba.dubbo.monitor.support.MonitorFilter

    前面部分都是前置处理,开始通过网络向远程发起调用是DubboInvoker.doInvoke方法

    protected Result doInvoke(final Invocation invocation) throws Throwable {
      RpcInvocation inv = (RpcInvocation) invocation;
      final String methodName = RpcUtils.getMethodName(invocation);
      // 往invocation的attachments中写入path和version参数
      inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
      inv.setAttachment(Constants.VERSION_KEY, version);
    
      ExchangeClient currentClient;
      if (clients.length == 1) {
        currentClient = clients[0];
      } else {
        // 因为dubbo支持多个client连接同一个provider,也就是是同一个provider多个连接connections,所以可能是有多个client
        currentClient = clients[index.getAndIncrement() % clients.length];
      }
      try {
        // 是否是异步调用,异步调用通过RpcContext.getContext().getFuture().get()来获取结果
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // Oneway:单向调用,调用方只管发起调用,不需要知道返回值,直接返回
        // Twoway:双向调用,调用方发起调用后需要知道返回值,需要返回值的情况又分为同步等待或者是异步通知
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 方法调用的超时时间
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
          boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
          currentClient.send(inv, isSent);
          RpcContext.getContext().setFuture(null);
          // oneWay只管发送,不关心结果,直接返回
          return new RpcResult();
        } else if (isAsync) {
          ResponseFuture future = currentClient.request(inv, timeout) ;
          // 异步调用,直接返回,由future来接收结果,需要结果的时候从future中get
          RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
          return new RpcResult();
        } else {
          // 同步调用
          RpcContext.getContext().setFuture(null);
          return (Result) currentClient.request(inv, timeout).get();
        }
      } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
      } catch (RemotingException e) {
        // 这儿的异常是dubbo调用过程中dubbo本身的异常,并不是应用抛出的异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
      }
    }

    那究竟最后consumer通过网络向provider传输过去的是什东西呢?下面这个方法属于HeaderExchangeChannel类

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
      if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
      }
      // create request.
      // 这儿的Request就是consumer发送出去的内容,Request.data就是RpcInvocation,也就是说provider通过netty接收到的对象就是Request
      Request req = new Request();
      req.setVersion("2.0.0");
      req.setTwoWay(true);
      req.setData(request);
      DefaultFuture future = new DefaultFuture(channel, req, timeout);
      try{
        //
        channel.send(req);
      }catch (RemotingException e) {
        future.cancel();
        throw e;
      }
      return future;
    }

    至于dubbo中的网络层netty这里就不深入介绍了,netty又是一个很强大的框架,以后专门介绍吧。

    总结

    至此,前面说的三个问题都解决了。consumer已经向provider发出了请求,接下来就是provider响应请求了。

     
     
  • 相关阅读:
    修复 Visual Studio Error “No exports were found that match the constraint”
    RabbitMQ Config
    Entity Framework Extended Library
    Navisworks API 简单二次开发 (自定义工具条)
    NavisWorks Api 简单使用与Gantt
    SQL SERVER 竖表变成横表
    SQL SERVER 多数据导入
    Devexpress GridControl.Export
    mongo DB for C#
    Devexress XPO xpPageSelector 使用
  • 原文地址:https://www.cnblogs.com/xingzc/p/9101214.html
Copyright © 2011-2022 走看看