zoukankan      html  css  js  c++  java
  • Dubbo Filter详解

    转载:https://www.jianshu.com/p/c5ebe3e08161

    Dubbo的Filter在使用的过程中是我们扩展最频繁的内容,而且Dubbo的很多特性实现也都离不开Filter的工作,今天一起来看一下Filter的具体实现。

    Filter(过滤器)在很多框架中都有使用过这个概念,基本上的作用都是类似的,在请求处理前或者处理后做一些通用的逻辑,而且Filter可以有多个,支持层层嵌套。
    Dubbo的Filter概念基本上符合我们正常的预期理解,而且Dubbo官方针对Filter做了很多的原生支持,目前大致有20来个吧,包括我们熟知的RpcContext,accesslog功能都是通过filter来实现了,下面一起详细看一下Filter的实现。
    Dubbo的Filter实现入口是在ProtocolFilterWrapper,因为ProtocolFilterWrapper是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(理解这里的前提是理解Dubbo的SPI机制),然后我们看一下这个Filter链是如何构造的。

        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            //向注册中心引用服务的时候并不会进行filter调用链
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
        
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            //获得所有激活的Filter(已经排好序的)
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    //复制引用,构建filter调用链
                    final Invoker<T> next = last;
                    //这里只是构造一个最简化的Invoker作为调用链的载体Invoker
                    last = new Invoker<T>() {
    
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
        
    

    看到上面的内容,我们大致能明白实现是这样子的,通过获取所有可以被激活的Filter链,然后根据一定顺序构造出一个Filter的调用链,最后的调用链大致是这样子:Filter1->Filter2->Filter3->......->Invoker,这个构造Filter链的逻辑非常简单,重点就在于如何获取被激活的Filter链。

        //将key在url中对应的配置值切换成字符串信息数组
        public List<T> getActivateExtension(URL url, String key, String group) {
            String value = url.getParameter(key);
            return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
        }
        
        public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList<T>();
            //所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names就指的配置激活的filter信息)
            List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    
            //如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filter
            if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
                //加载extension信息
                getExtensionClasses();
                for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                    //name指的是SPI读取的配置文件的key
                    String name = entry.getKey();
                    Activate activate = entry.getValue();
                    //group主要是区分实在provider端生效还是consumer端生效
                    if (isMatchGroup(group, activate.group())) {
                        T ext = getExtension(name);
                        //这里以Filter为例:三个判断条件的含义依次是:
                        //1.用户配置的filter列表中不包含当前ext
                        //2.用户配置的filter列表中不包含当前ext的加-的key
                        //3.如果用户的配置信息(url中体现)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
                        if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                                && isActive(activate, url)) {
                            exts.add(ext);
                        }
                    }
                }
                //根据Activate注解上的order排序
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            //进行到此步骤的时候Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
            List<T> usrs = new ArrayList<T>();
            for (int i = 0; i < names.size(); i ++) {
                String name = names.get(i);
                //如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种的可以,这个if是进不去的)
                if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                    //可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
                    if (Constants.DEFAULT_KEY.equals(name)) {
                        if (usrs.size() > 0) {
                            exts.addAll(0, usrs);
                            usrs.clear();
                        }
                    } else {
                        //加入用户自己定义的扩展Filter
                        T ext = getExtension(name);
                        usrs.add(ext);
                    }
                }
            }
            if (usrs.size() > 0) {
                exts.addAll(usrs);
            }
            return exts;
        }
        
    

    基本上到这里就能看到Filter链是如何被加载进来的,这里设计的非常灵活,忍不住要感叹一下:通过简单的配置‘-’可以手动剔除Dubbo原生的一定加载Filter,通过default来代替Dubbo原生的一定会加载的Filter从而来控制顺序。这些设计虽然都是很小的功能点,但是总体的感觉是十分灵活,考虑的比较周到,非常值得我这种菜鸟学习。

    知道了Filter构造的过程之后,我们就详细看几个比较重要的Filter信息。
    Filter在作用端区分的话主要是区分为consumer和provider,下面我们就以这个为区分来分别介绍一些重点的Filter。

    Cunsumer

    ConsumerContextFilter (默认触发)

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            //在当前的RpcContext中记录本地调用的一次状态信息
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(), 
                                      invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    

    其实简单来看这个Filter的话是十分简单,它又是怎么将客户端设置的隐式参数传递给服务端呢?载体就是Invocation对象,在客户端调用Invoker.invoke方法时候,会去取当前状态记录器RpcContext中的attachments属性,然后设置到RpcInvocation对象中,在RpcInvocation传递到provider的时候会通过另外一个过滤器ContextFilter将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数。这就是为什么RpcContext只能记录一次请求的状态信息,因为在第二次调用的时候参数已经被新的RpcInvocation覆盖掉,第一次的请求信息对于第二次执行是不可见的。

    ActiveLimitFilter (当配置了actives并且值不为0的时候触发)

    ActiveLimitFilte主要用于限制同一个客户端对于一个服务端方法的并发调用量。(客户端限流)

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
            //主要记录每台机器针对某个方法的并发数量
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            if (max > 0) {
                long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                int active = count.getActive();
                if (active >= max) {
                    synchronized (count) {
                        //这个while循环是必要的,因为在一次wait结束后,可能线程调用已经结束了,腾出来consumer的空间
                        while ((active = count.getActive()) >= max) {
                            try {
                                count.wait(remain);
                            } catch (InterruptedException e) {
                            }
                            //如果wait方法被中断的话,remain这时候有可能大于0
                            //如果其中一个线程运行结束自后调用notify方法的话,也有可能remain大于0
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                            if (remain <= 0) {
                                throw new RpcException("...");
                            }
                        }
                    }
                }
            }
            try {
                //调用开始和结束后增减并发数量
                long begin = System.currentTimeMillis();
                RpcStatus.beginCount(url, methodName);
                try {
                    Result result = invoker.invoke(invocation);
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                    return result;
                } catch (RuntimeException t) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                    throw t;
                }
            } finally {
                //这里很关键,因为一个调用完成后要通知正在等待执行的队列
                if(max>0){
                    synchronized (count) {
                        count.notify();
                    } 
                }
            }
        }
    

    FutureFilter

    Future主要是处理事件信息,主要有以下几个事件:

    • oninvoke 在方法调用前触发(如果调用出现异常则会直接触发onthrow方法)
    • onreturn 在方法返回会触发(如果调用出现异常则会直接触发onthrow方法)
    • onthrow 调用出现异常时候触发
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
            // 这里主要处理回调逻辑,主要区分三个时间:oninvoke:调用前触发,onreturn:调用后触发 onthrow:出现异常情况时候触发
            fireInvokeCallback(invoker, invocation);
            //需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.
            Result result = invoker.invoke(invocation);
            if (isAsync) {
                asyncCallback(invoker, invocation);
            } else {
                syncCallback(invoker, invocation, result);
            }
            return result;
        }
        
        private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
            final Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
            final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
            
            if (onInvokeMethod == null  &&  onInvokeInst == null ){
                return ;
            }
            if (onInvokeMethod == null  ||  onInvokeInst == null ){
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onInvokeMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
            }
            //由于JDK的安全检查耗时较多.所以通过setAccessible(true)的方式关闭安全检查就可以达到提升反射速度的目的
            if (onInvokeMethod != null && ! onInvokeMethod.isAccessible()) {
                onInvokeMethod.setAccessible(true);
            }
            //从之类可以看出oninvoke的方法参数要与调用的方法参数一致
            Object[] params = invocation.getArguments();
            try {
                onInvokeMethod.invoke(onInvokeInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
        
        //fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了
        private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
            final Method onthrowMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
            final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
    
            if (onthrowMethod == null  &&  onthrowInst == null ){
                return ;
            }
            if (onthrowMethod == null  ||  onthrowInst == null ){
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onthrow callback config , but no such "+(onthrowMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
            }
            if (onthrowMethod != null && ! onthrowMethod.isAccessible()) {
                onthrowMethod.setAccessible(true);
            }
            Class<?>[] rParaTypes = onthrowMethod.getParameterTypes() ;
            if (rParaTypes[0].isAssignableFrom(exception.getClass())){
                try {
                    //因为onthrow方法的参数第一个值必须为异常信息,所以这里需要构造参数列表
                    Object[] args = invocation.getArguments();
                    Object[] params;
                    
                    if (rParaTypes.length >1 ) {
                        //原调用方法只有一个参数而且这个参数是数组(单独拎出来计算的好处是这样可以少复制一个数组)
                        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)){
                            params = new Object[2];
                            params[0] = exception;
                            params[1] = args ;
                        }else {//原调用方法有多于一个参数
                            params = new Object[args.length + 1];
                            params[0] = exception;
                            System.arraycopy(args, 0, params, 1, args.length);
                        }
                    } else {//原调用方法没有参数
                        params = new Object[] { exception };
                    }
                    onthrowMethod.invoke(onthrowInst,params);
                } catch (Throwable e) {
                    logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), e);
                } 
            } else {
                logger.error(invocation.getMethodName() +".call back method invoke error . callback method :" + onthrowMethod + ", url:"+ invoker.getUrl(), exception);
            }
        }
        
    

    同步异步的主要处理区别就是同步调用的话,事件触发是直接调用的,没有任何逻辑;异步的话就是首先获取到调用产生的Future对象,然后复写Future的done()方法,将fireThrowCallback和fireReturnCallback逻辑引入即可。

    Provider

    ContextFilter

    ContextFilter和ConsumerContextFilter是结合使用的,之前的介绍中已经看了ConsumerContextFilter,下面再简单看一下ContextFilter,来验证刚才讲到的逻辑。

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            Map<String, String> attachments = invocation.getAttachments();
            if (attachments != null) {
            //隐式参数重剔除一些核心消息
                attachments = new HashMap<String, String>(attachments);
                attachments.remove(Constants.PATH_KEY);
                attachments.remove(Constants.GROUP_KEY);
                attachments.remove(Constants.VERSION_KEY);
                attachments.remove(Constants.DUBBO_VERSION_KEY);
                attachments.remove(Constants.TOKEN_KEY);
                attachments.remove(Constants.TIMEOUT_KEY);
            }
            //这里又重新将invocation和attachments信息设置到RpcContext,这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setAttachments(attachments)
                    .setLocalAddress(invoker.getUrl().getHost(), 
                                     invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.removeContext();
            }
        }
    

    EchoFilter

    回响测试主要用来检测服务是否正常(网络状态),单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可.

        public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
            if(inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 )
                return new RpcResult(inv.getArguments()[0]);
            return invoker.invoke(inv);
        }
    

    ExecuteLimitFilter

    服务端接口限制限流的具体执行逻辑就是在ExecuteLimitFilter中,因为服务端不需要考虑重试等待逻辑,一旦当前执行的线程数量大于指定数量,就直接返回失败了,所以实现逻辑相对于ActiveLimitFilter倒是简便了不少。

        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
            if (max > 0) {
                RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                if (count.getActive() >= max) {
                    throw new RpcException("...");
                }
            }
            long begin = System.currentTimeMillis();
            boolean isException = false;
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (Throwable t) {
                isException = true;
                if(t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                }
                else {
                    throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
                }
            }
            finally {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
            }
        }
    

    ExceptionFilter

    Dubbo 对于异常的处理有自己的一套规则:

    • 如果是checked异常则直接抛出
    • 如果是unchecked异常但是在接口上有声明,也会直接抛出
    • 如果异常类和接口类在同一jar包里,直接抛出
    • 如果是JDK自带的异常,直接抛出
    • 如果是Dubbo的异常,直接抛出
    • 其余的都包装成RuntimeException然后抛出(避免异常在Client出不能反序列化问题)
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            try {
                Result result = invoker.invoke(invocation);
                if (result.hasException() && GenericService.class != invoker.getInterface()) {
                    try {
                        Throwable exception = result.getException();
    
                        // 如果是checked异常,直接抛出
                        if (! (exception instanceof RuntimeException) && (exception instanceof Exception)) {
                            return result;
                        }
                        // 运行时异常,并且在方法签名上有声明,直接抛出
                        try {
                            Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                            Class<?>[] exceptionClassses = method.getExceptionTypes();
                            for (Class<?> exceptionClass : exceptionClassses) {
                                if (exception.getClass().equals(exceptionClass)) {
                                    return result;
                                }
                            }
                        } catch (NoSuchMethodException e) {
                            return result;
                        }
    
                        // 未在方法签名上定义的异常,在服务器端打印ERROR日志
                        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                                + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
    
                        // 异常类和接口类在同一jar包里,直接抛出
                        String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                        String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                        if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)){
                            return result;
                        }
                        // 是JDK自带的异常,直接抛出
                        String className = exception.getClass().getName();
                        if (className.startsWith("java.") || className.startsWith("javax.")) {
                            return result;
                        }
                        // 是Dubbo本身的异常,直接抛出
                        if (exception instanceof RpcException) {
                            return result;
                        }
    
                        // 否则,包装成RuntimeException抛给客户端
                        return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
                    } catch (Throwable e) {
                        logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                                + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                                + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                        return result;
                    }
                }
                return result;
            } catch (RuntimeException e) {
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                        + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                        + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                throw e;
            }
        }
    

    基本到这里的话把比较重要的Filter内容都有讲解到了,我们可以根据自己的需求非常轻易地扩展适合自己业务使用的Filter。

    本文最后,还是习惯性的撒花~~~~

  • 相关阅读:
    83. Remove Duplicates from Sorted List
    35. Search Insert Position
    96. Unique Binary Search Trees
    94. Binary Tree Inorder Traversal
    117. Populating Next Right Pointers in Each Node II
    116. Populating Next Right Pointers in Each Node
    111. Minimum Depth of Binary Tree
    169. Majority Element
    171. Excel Sheet Column Number
    190. Reverse Bits
  • 原文地址:https://www.cnblogs.com/ceshi2016/p/12016531.html
Copyright © 2011-2022 走看看