zoukankan      html  css  js  c++  java
  • dubbo源码阅读-服务订阅(九)之Filter实现原理(dubbo)

    最近完成一个需求,使用阿里Arms需要在log里面加上traceId,但是发现dubbo异常 被ExceptionFilter捕获 并打印  打印不出traceI,然后百度搜索如何重写Filter

    参考了这篇文章

    https://www.jianshu.com/p/7e7076212bd0

    重写ExceptionFilter

    1.新增一个DubboExceptionFilter类

    标红部分 是我改动电脑 其他都是复制原来的ExceptionFilter

    @Activate(
            group = {"provider"}
    )
    public class ArmsDubboExceptionFilter implements Filter {
        private final Logger logger;
    
        public ArmsDubboExceptionFilter() {
            this(LoggerFactory.getLogger(ArmsDubboExceptionFilter.class));
        }
    
        public ArmsDubboExceptionFilter(Logger logger) {
            this.logger = logger;
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            try {
                setSpan(invocation);
                Result result = invoker.invoke(invocation);
                if (result.hasException() && GenericService.class != invoker.getInterface()) {
                    try {
                        Throwable exception = result.getException();
                        if (!(exception instanceof RuntimeException) && exception instanceof Exception) {
                            return result;
                        } else {
                            try {
                                Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                                Class<?>[] exceptionClassses = method.getExceptionTypes();
                                Class[] arr$ = exceptionClassses;
                                int len$ = exceptionClassses.length;
    
                                for (int i$ = 0; i$ < len$; ++i$) {
                                    Class<?> exceptionClass = arr$[i$];
                                    if (exception.getClass().equals(exceptionClass)) {
                                        return result;
                                    }
                                }
                            } catch (NoSuchMethodException var11) {
                                return result;
                            }
    
                            this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
                            String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                            String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                            if (serviceFile != null && exceptionFile != null && !serviceFile.equals(exceptionFile)) {
                                String className = exception.getClass().getName();
                                if (!className.startsWith("java.") && !className.startsWith("javax.")) {
                                    return (Result) (exception instanceof RpcException ? result : new RpcResult(new RuntimeException(StringUtils.toString(exception))));
                                } else {
                                    return result;
                                }
                            } else {
                                return result;
                            }
                        }
                    } catch (Throwable var12) {
                        this.logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var12.getClass().getName() + ": " + var12.getMessage(), var12);
                        return result;
                    }
                } else {
                    return result;
                }
            } catch (RuntimeException var13) {
                this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var13.getClass().getName() + ": " + var13.getMessage(), var13);
                throw var13;
            } finally {
                ArmsUtils.remove();
            }
        }
    
        /*
         * 因为通过arms sdk拿不到traceId(与阿里工程师沟通 貌似是bug 暂时自己再协议头里面获取 并存入线程缓存 供log appender使用)
         * @param invocation
         */
        public void setSpan(Invocation invocation) {
            try {
                String sampled = invocation.getAttachment("X-B3-Sampled");
                ArmsUtils.setSpan(invocation.getAttachment("X-B3-TraceId"), invocation.getAttachment("EagleEye-RpcID"), invocation.getAttachment("X-B3-SpanId"), sampled != null && sampled.equals("1"));
            } catch (Exception e) {
                logger.error("写入span异常", e);
            }
        }
    }

    2.在/src/main/resources/META-INF/dubbo/com.alibaba.dubbo.rpc.Filter新增一个文件com.alibaba.dubbo.rpc.Filter

    内容

    DubboExceptionFilter=com.biz.core.armslog.ArmsDubboExceptionFilter

    3.soa-provider.xml配置

       <!-- 延迟暴露服务,表示延迟到Spring容器初始化完成时暴露服务; 不重试 filter自定义一个dubboExceptionFilter -exception表示替换了默认的ExceptionFilter 增加arms日志打印-->
        <dubbo:provider delay="-1" retries="0" filter="DubboExceptionFilter,-exception"/>

    ProtocolFilterWrapper调用时机

    这里用到dubboSPI参见《dubbo源码阅读-dubbo SPI实现原理(五)》 《 ProtocolFilterWrapper调用时机》

    阅读源码 理解为什么这么写

    ProtocolFilterWrapper

    为dubboFilter的包装类 用来为生成filter执行链

    /**
         * 为invoker生成filter调用链条 invoker为执行对象
         * @param invoker 执行对象
         * @param key 为参数名
         * @param group 等于@Activate(group = {"provider"})
         * @param <T>
         * @return
         */
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            final Invoker<T> last = invoker;
            /**
             * 使用dubbo SPI获取默认和用户自定义的filter
             * url为暴露的服务地址
             * keyinjvm://127.0.0.1/com.biz.soa.service.promotion.backend.OfflineExtend.PromotionOfflineService?anyhost=true&application=soa-promotion-provider&bind.ip=10.37.129.2&bind.port=23888&default.delay=-1&default.retries=0&default.service.filter=DubboExceptionFilter,-exception&delay=-1&dispatcher=message&dubbo=2.6.2&generic=false&interface=com.biz.soa.service.promotion.backend.OfflineExtend.PromotionOfflineService&methods=downLoadPromotion,updaPromotion,queryPromotionOffline,getOfflineProducts,SavePromotion&pid=81391&register=false&side=provider&threadpool=fixed&threads=500&timestamp=1574842487417
             * group为provider (如果是consumer则是consumer)
             * */
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                /**
                 * 这里是循环生成filer调用链条  filter1->filter2->invoke
                 */
                for(int i = filters.size() - 1; i >= 0; --i) {
                    final Filter filter = (Filter)filters.get(i);
                    last = new Invoker<T>() {
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(last, invocation);
                        }
    
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
    
            return last;
        }

    ExtensionLoader

    ExtensionLoader.getExtensionLoader实现

     /**
         * 根据class获取对应的ExtensionLoader
         * @param type 
         * @param <T>
         * @return
         */
        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
            if (type == null) {
                throw new IllegalArgumentException("Extension type == null");
            } else if (!type.isInterface()) {
                throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
            } else if (!withExtensionAnnotation(type)) {
                throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
            } else {
                //因为是泛型对应的扩展点 提前初始化了一个对应类型的 ExtensionLoader到 这里就会获取到ExtensionLoader<Filter> 的对象
                ExtensionLoader<T> loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
                if (loader == null) {
                    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
                    loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
                }
    
                return loader;
            }
        }

    ExtensionLoader.getActivateExtension

     /**
         * 获取所有filter 默认的或者用户配置的
         * @param url
         * @param values
         * @param group
         * @return
         */
        public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList();
            List<String> names = values == null ? new ArrayList(0) : Arrays.asList(values);
            String name;
            /**
             * 这里是加载默认的filter 如果配置了 -default 将忽略所有默认过滤器
             */
            if (!((List)names).contains("-default")) {
                this.getExtensionClasses();
                Iterator i$ = this.cachedActivates.entrySet().iterator();
    
                while(i$.hasNext()) {
                    Map.Entry<String, Activate> entry = (Map.Entry)i$.next();
                    /**
                     * 获得spi配置的key名字 如dubboSPI配置: exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
                     */
                    name = (String)entry.getKey();
                    Activate activate = (Activate)entry.getValue();
                    /**
                     * 这里匹配Activate的配置的group 只找出provider的
                     */
                    if (this.isMatchGroup(group, activate.group())) {
                        T ext = this.getExtension(name);
                        /**
                         * 因为我们配置了-exception  所以这里不会加载
                         * isActive如果注解配置了value@Activate(group={"provider"},value={"token"}) 则会在parameter检查是否有传递token 参数 如果有才加入到过滤器
                         */
                        if (!((List)names).contains(name) && !((List)names).contains("-" + name) && this.isActive(activate, url)) {
                            exts.add(ext);
                        }
                    }
                }
                /**
                 * 排序 可以看ActivateCompartor.COMPARATOR实现 这个排序实现是根据 注解的order 值来排的
                 */
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            /**
             * 下面是加载用户的Filter
             * <dubbo:provider delay="-1" retries="0" filter="DubboExceptionFilter,-exception"/>
             */
            List<T> usrs = new ArrayList();
    
            for(int i = 0; i < ((List)names).size(); ++i) {
                name = (String)((List)names).get(i);
                //-开头的的表示剔除 不执行逻辑
                if (!name.startsWith("-") && !((List)names).contains("-" + name)) {
                    if ("default".equals(name)) {
                        if (!usrs.isEmpty()) {
                            exts.addAll(0, usrs);
                            usrs.clear();
                        }
                    } else {
                        T ext = this.getExtension(name);
                        usrs.add(ext);
                    }
                }
            }
    
            if (!usrs.isEmpty()) {
                exts.addAll(usrs);
            }
    
            return exts;
        }
     private boolean isActive(Activate activate, URL url) {
            //获得注解桑的value
            String[] keys = activate.value();
            //如果没有值直接返回true
            if (keys.length == 0) {
                return true;
            } else {
                String[] arr$ = keys;
                int len$ = keys.length;
    
                label34:
                //遍历values
                for(int i$ = 0; i$ < len$; ++i$) {
                    String key = arr$[i$];
                    //获得参数列表迭代器 参数列表为xml配置哦
                    Iterator i$ = url.getParameters().entrySet().iterator();
    
                    String k;
                    String v;
                    do {
                        do {
                            if (!i$.hasNext()) {
                                continue label34;
                            }
    
                            Map.Entry<String, String> entry = (Map.Entry)i$.next();
                            k = (String)entry.getKey();
                            v = (String)entry.getValue();
                            //参数包含了 过滤器信息 同时值不能为空 就加入过滤器
                        } while(!k.equals(key) && !k.endsWith("." + key));
                    } while(!ConfigUtils.isNotEmpty(v));
    
                    return true;
                }
    
                return false;
            }
        }

    看完之后是不是一切都明朗了。。我们的com.alibaba.dubbo.rpc.Filter 配置 是SPI实现方式 实现动态注入

  • 相关阅读:
    Windows性能计数器应用
    Azure Oracle Linux VNC 配置
    Azure 配置管理系列 Oracle Linux (PART6)
    Azure 配置管理系列 Oracle Linux (PART5)
    Azure 配置管理系列 Oracle Linux (PART4)
    Azure 配置管理系列 Oracle Linux (PART3)
    Azure 配置管理系列 Oracle Linux (PART2)
    vagrant多节点配置
    docker基本操作
    LINUX开启允许对外访问的网络端口命令
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12502639.html
Copyright © 2011-2022 走看看