zoukankan      html  css  js  c++  java
  • dubbo入门之服务消费

    今天,我们来看看dubbo消费的执行过程
    首先,我们都知道dubbo是一个基于netty实现的RPC框架,底层通信是使用netty来实现的。在学习dubbo的时候,或许我们都会有下面的这些疑惑:
    1、服务消费者只持有服务接口,我们的消费端在执行接口请求的时候获取到的接口实现是什么?
    2、消费者是如何通过netty建立同服务端的通信的?
    3、服务是怎么注册到注册中心的?
    4、消费端怎么拉取服务?
    5、服务的负载均衡是如何体现的?
    等等这些问题都会困扰着我们,今天我们先来聊聊dubbo消费端的实现原理
    现在,你可能已经自己通过官网的教程搭建了自己的dubbo demo服务,你在执行demo的时候会发现,服务消费者只持有服务接口,你是通过@Reference注解去获取的实现,你已经知道spring bean工厂会自动为用户创建代理实例,那么dubbo为我们的消费者创建的代理实现是什么呢?只要开启idea的调试模式,你就可以看到我们得到的实现其实是:com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler。该Handler实现了InvocationHandler,InvocationHandler是JDK动态代理实现的核心接口,如果你不了解动态代理,那建议你自己去了解一下。
    回到正题,我们通过接口调用的方法都会被该Handler代理,该Handler源码如下:

    public class InvokerInvocationHandler implements InvocationHandler {
    
    	private final Invoker<?> invoker;
    
    	public InvokerInvocationHandler(Invoker<?> handler) {
    		this.invoker = handler;
    	}
    
    	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    		String methodName = method.getName();
    		Class<?>[] parameterTypes = method.getParameterTypes();
    		if (method.getDeclaringClass() == Object.class) {
    			return method.invoke(invoker, args);
    		}
    		if ("toString".equals(methodName) && parameterTypes.length == 0) {
    			return invoker.toString();
    		}
    		if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
    			return invoker.hashCode();
    		}
    		if ("equals".equals(methodName) && parameterTypes.length == 1) {
    			return invoker.equals(args[0]);
    		}
    		return invoker.invoke(new RpcInvocation(method, args)).recreate();
    	}
    
    }
    

    源码很简单,只有一个invoke方法,它是代理类和接口之间的桥梁。如果你再细心一点,会发现InvokerInvocationHandler中的Invoker实现类是com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker。它是dubbo invoker的默认实现,里面封装了服务降级等功能。到这里,你基本已经知道消费者究竟是怎么去调用服务的了,后面你只要继续跟着源码调试,服务是如何和netty建立联系的

    消费者请求调用链:
    
    proxy0#sayHello(String)
      —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    	—> MockClusterInvoker#invoke(Invocation)
    	  —> AbstractClusterInvoker#invoke(Invocation)
    		—> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
    		  —> Filter#invoke(Invoker, Invocation)  // 包含多个 Filter 调用
    			—> ListenerInvokerWrapper#invoke(Invocation) 
    			  —> AbstractInvoker#invoke(Invocation) 
    				—> DubboInvoker#doInvoke(Invocation)
    				  —> ReferenceCountExchangeClient#request(Object, int)
    					—> HeaderExchangeClient#request(Object, int)
    					  —> HeaderExchangeChannel#request(Object, int)
    						—> AbstractPeer#send(Object)
    						  —> AbstractClient#send(Object, boolean)
    							—> NettyChannel#send(Object, boolean)
    							  —> NioClientSocketChannel#write(Object)
    

    在MockClusterInvoker是一个抽象类,它的默认实现是FailoverClusterInvoker,在MockClusterInvoker中,通过服务目录Directory列举服务列表,核心方法invoke如下:

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
        //列举服务列表
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
    

    服务目录Directory已知的实现有RegistryDirectory和StaticDirectiry,默认使用的是RegistryDirectory,在进行服务调用的时候会从这里面去获取可用的服务列表,如果想要了解更多,推荐阅读dubbo官网服务列表一章,里面有非常详细的介绍。
    可以看到,获取服务列表之后会从系统扩展中加载默认的负载均衡实现,然后继续往下执行到子类FailoverClusterInvoker的模板方法doInvoke,该方法会重新执行服务列举并检查服务的可用性,之后通过负载均衡策略选择具体服务,dubbo默认负载均衡策略是随机RandomLoadBalance。关键代码如下:

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        // 服务检查
        checkInvokers(copyinvokers, invocation);
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                // i>0,说明第一次服务调用失败,需要重新检查服务列表
                checkWhetherDestroyed();    //如果服务已经销毁,抛出异常
                copyinvokers = list(invocation);//重新列举服务
                // check again
                checkInvokers(copyinvokers, invocation);//检查服务是否为空
            }
            //负载均衡获取服务
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);//将获取的服务添加到已执行列表
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                //服务请求
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
               
            }
            // 异常处理部分省略。。。
        }
     
    }
    

    通过负载均衡获取到具体服务后,执行服务调用,到AbstractInvoker的invoke方法,主要设置一些attachment的信息。重点来看看实现类DubboInvoke的doInvoke方法,如下:

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //设置路径和版本
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
    
        //ExchangeClient
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            //从配置中获取是否同步执行
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            //是否单向执行
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            //请求超时时间
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                //如果单向执行,发起调用后立即返回一个空RpcResult
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                //如果异步,调用后立即返回一个空的RpcResult
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                //否则,同步等待
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    ExchangeClient接口的实现是HeaderExchangeClient,request方法很简单,如下

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        return channel.request(request, timeout);
    }
    

    就是做了一次请求转发,channel是ExchangeChannel,默认实现是HeaderExchangeChannel,HeaderExchangeChannel的request方法如下:

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");//版本号
        req.setTwoWay(true);//双向通信
        req.setData(request);//请求数据
        DefaultFuture future = new DefaultFuture(channel, req, timeout);//Future
        try {
            channel.send(req);//Dubbo封装的通信Channel
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    

    这里就是构造Request请求体,然后传递给NettyChannel,NettyChannel封装了netty的channel,通过该channel将数据请求写入到TCP请求中传递给服务端。NettyChannel的send方法如下

    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
    
        boolean success = true;
        int timeout = 0;
        try {
            //数据请求
            ChannelFuture future = channel.write(message);//此处的channel才是org.jboss.netty.channel.Channel
            if (sent) {
                //等待请求结果
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
    
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    

    到这里,关于dubbo消费请求的基本流程已经走完,继续往下就是netty层面的东西了,有兴趣的童鞋可以自行寻找netty相关教程

  • 相关阅读:
    C语言 · 递归求二项式系数值
    C语言 · 错误票据
    C语言 · 色盲的民主
    C语言 · 分苹果
    C语言 · Quadratic Equation
    C语言 · 企业奖金发放
    C语言 · 最长单词
    C语言 · 高精度加法
    C语言 · 判断回文
    C语言 · 简单计算器
  • 原文地址:https://www.cnblogs.com/canmeng-cn/p/10541708.html
Copyright © 2011-2022 走看看