zoukankan      html  css  js  c++  java
  • dubbo源码—service invoke

    dubbo的远程调用过程是怎么样的?

    dubbo远程过程调用经过了那些处理?

    发起远程调用的时候究竟传了什么数据给provider?

    要解决这些问题,欢迎一起探讨走进dubbo源码栏目。

    在service reference中说了consumer端发起调用的时候使用的是远程服务的本地代理,发起调用的堆栈是

    (上面调用堆栈中的filter链先不介绍了,留在后面service reply中介绍,因为consumer和provider的filter链构造过程是类似的)

    前面说过consumer在引用服务的时候最终会生成一个proxy,该proxy是实现了对应的服务接口(比如:com.test.service.TestDubboService),而且包含一个InvokerInvocationHandler属性,在proxy的服务接口方法中调用InvokerInvocationHandler.invoke

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      String methodName = method.getName();
      Class<?>[] parameterTypes = method.getParameterTypes();
      // 判断是否是Object的方法,如果是则直接执行本地调用,不发起远程调用
      if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
      }
      // toString、hashCode、equals执行本地调用
      if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
      }
      if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
      }
      if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
      }
    
      // RpcInvocation是一个很重要的类,是真正在consumer和provider之间通过网络传输的实体,里面包含了服务调用需要的必要信息
      RpcInvocation invocation = new RpcInvocation(method, args);
      Result r = null;
      try {
        r = invoker.invoke(invocation);
        Object o = r.recreate();
        return o;
      } catch (Throwable e) {
        // ... 省略中间代码
      }
    }
    
    public class RpcInvocation implements Invocation, Serializable {
    
        private static final long serialVersionUID = -4355285085441097045L;
    	// 调用的方法全限定名
        private String               methodName;
    	// 方法的参数类型
        private Class<?>[]           parameterTypes;
    	// 入参
        private Object[]             arguments;
    	// 附件,可以传递一些其他信息
        private Map<String, String>  attachments;
    	// invoker
        private transient Invoker<?> invoker;
      	// ... 省略中间代码
    }
    

    上面通过invoke调用发起invoker的调动链,依次调用invoker.invoke方法

    MockClusterInvoker#invoke

    invoke方法里面会判断是否配置了mock参数

    1. 如果没有配置则直接继续调用invoke
    2. 如果配置的值以force开头,表明直接mock调用
    3. 如果mock配置了但是不以force开头,要先试试正常调用,如果正常调用失败了才会使用mock

    AbstractClusterInvoker#invoke

    这里继续正常情况下的非mock调用。其实上面接下来调用的是FailbackClusterInvoker#invoke,但是FailbackClusterInvoker继承了AbstractClusterInvoker,而且FailbackClusterInvoker没有实现invoke方法,所以直接调用了超类的invoke方法

    1. 判断当前Invoker是否被销毁,如果销毁直接抛出异常
    2. 然后调用directory.list找出所有的invoker
    3. 通过SPI加载负载均衡的扩展
    4. 调用实现类的doInvoke

    FailbackClusterInvoker#doInvoke

    这个类是负责dubbo调用失败重试的类

    1. 首先检查是否有可以调用的invoker
    2. 调用AbstractClusterInvoker.select,通过路由到一个指定invoker
    3. 通过路由到的invoker发起调用
    4. 如果调用抛出异常了,将请求加入失败列表然后定时重试

    上面invoker链调动完之后,会调用dubbo的filter链

    // dubbo有一个调用上下文RpcContext,这个filter就是负责往context写信息
    com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
    // dubbo支持事件通知(oninvoke,onreturn,onthrow),通过该filter实现,
    com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
    // 负责调用过程的监控,调用耗时、并发数等
    com.alibaba.dubbo.monitor.support.MonitorFilter
    

    前面部分都是前置处理,开始通过网络向远程发起调用是DubboInvoker.doInvoke方法

    protected Result doInvoke(final Invocation invocation) throws Throwable {
      RpcInvocation inv = (RpcInvocation) invocation;
      final String methodName = RpcUtils.getMethodName(invocation);
      // 往invocation的attachments中写入path和version参数
      inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
      inv.setAttachment(Constants.VERSION_KEY, version);
    
      ExchangeClient currentClient;
      if (clients.length == 1) {
        currentClient = clients[0];
      } else {
        // 因为dubbo支持多个client连接同一个provider,也就是是同一个provider多个连接connections,所以可能是有多个client
        currentClient = clients[index.getAndIncrement() % clients.length];
      }
      try {
        // 是否是异步调用,异步调用通过RpcContext.getContext().getFuture().get()来获取结果
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // Oneway:单向调用,调用方只管发起调用,不需要知道返回值,直接返回
    	// Twoway:双向调用,调用方发起调用后需要知道返回值,需要返回值的情况又分为同步等待或者是异步通知
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 方法调用的超时时间
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
          boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
          currentClient.send(inv, isSent);
          RpcContext.getContext().setFuture(null);
          // oneWay只管发送,不关心结果,直接返回
          return new RpcResult();
        } else if (isAsync) {
          ResponseFuture future = currentClient.request(inv, timeout) ;
          // 异步调用,直接返回,由future来接收结果,需要结果的时候从future中get
          RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
          return new RpcResult();
        } else {
          // 同步调用
          RpcContext.getContext().setFuture(null);
          return (Result) currentClient.request(inv, timeout).get();
        }
      } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
      } catch (RemotingException e) {
        // 这儿的异常是dubbo调用过程中dubbo本身的异常,并不是应用抛出的异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
      }
    }
    

    那究竟最后consumer通过网络向provider传输过去的是什东西呢?下面这个方法属于HeaderExchangeChannel类

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
      if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
      }
      // create request.
      // 这儿的Request就是consumer发送出去的内容,Request.data就是RpcInvocation,也就是说provider通过netty接收到的对象就是Request
      Request req = new Request();
      req.setVersion("2.0.0");
      req.setTwoWay(true);
      req.setData(request);
      DefaultFuture future = new DefaultFuture(channel, req, timeout);
      try{
        //
        channel.send(req);
      }catch (RemotingException e) {
        future.cancel();
        throw e;
      }
      return future;
    }
    

    至于dubbo中的网络层netty这里就不深入介绍了,netty又是一个很强大的框架,以后专门介绍吧。

    总结

    至此,前面说的三个问题都解决了。consumer已经向provider发出了请求,接下来就是provider响应请求了。

  • 相关阅读:
    字符串函数使用与 Culture
    学习 Monitor使用
    Extjs的ajax实现
    linux下tomcat的安装及部署
    使用jquery插件实现打印指定区域功能
    hibernate的各种保存方式的区别 (save,persist,update,saveOrUpdte,merge,flush,lock)等
    html 树形菜单
    Ext4 修复对话框按钮翻译
    spring aop expression简单说明
    tomcat内存溢出的解决方法(java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError:)
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/8325992.html
Copyright © 2011-2022 走看看