zoukankan      html  css  js  c++  java
  • dubbo源码阅读-Filter默认实现(十)

    SPI配置的默认实现

    cache=com.alibaba.dubbo.cache.filter.CacheFilter
    validation=com.alibaba.dubbo.validation.filter.ValidationFilter
    echo=com.alibaba.dubbo.rpc.filter.EchoFilter
    generic=com.alibaba.dubbo.rpc.filter.GenericFilter
    genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
    token=com.alibaba.dubbo.rpc.filter.TokenFilter
    accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
    activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
    classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
    context=com.alibaba.dubbo.rpc.filter.ContextFilter
    consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
    exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
    executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
    deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
    compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
    timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
    trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
    future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
    monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

    Consumer

    ConsumerContextFilter

    记录一些基础信息到当前线程的PRCContext

    @Activate(
                group = {"consumer"},
                order = -10000
        )
        public class ConsumerContextFilter implements Filter {
            public ConsumerContextFilter() {
            }
    
            public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
                /***
                 * 从线程缓存获取本次RpcContext.getContext()
                 * 设置一些本次请求的基础信息到RpcContext
                 */
                RpcContext.getContext().setInvoker(invoker)
                        .setInvocation(invocation)
                        .setLocalAddress(NetUtils.getLocalHost(), 0)
                        .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
                if (invocation instanceof RpcInvocation) {
                    ((RpcInvocation)invocation).setInvoker(invoker);
                }
    
                Result var3;
                try {
                    /**
                     * 客户端相关参数是根据 invocation传递给消费者的 可以打断点看 也可以自定义一些数据 比如traceId
                     */
                    var3 = invoker.invoke(invocation);
                } finally {
                    RpcContext.getContext().clearAttachments();
                }
    
                return var3;
            }
        }

    ActiveLimitFilter

    例子

    同时只支持1的并发量

    <dubbo:method actives="1" ... />

    源码

    ActiveLimitFilter主要用于 限制同一个客户端对于一个服务端方法的并发调用量(客户端限流)。

    /**
     * 控制调用服务的并发量 限流
     * 同时支持多少请求
     */
    @Activate(
            group = {"consumer"},
            value = {"actives"}
    )
    public class ActiveLimitFilter implements Filter {
        public ActiveLimitFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            //获得 <dubbo:reference actives="1"> actives的数量
            int max = invoker.getUrl().getMethodParameter(methodName, "actives", 0);
            //获取当前service当前方法的请求数量
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            long timeout;
            //配置并发控制大于0才写
            if (max > 0) {
                //获得当前方法的等待时间
                timeout = (long)invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                //判断是否大于并发数 如果大于则等待
                int active = count.getActive();
                if (active >= max) {
                    synchronized(count) {
                        /**
                         *1.while循环是有必要的
                         *  当收到其他线程notify 获得执行权
                         *  但是这个时候其他线程提前进入(active >= max)  判断为false获得执行权 count+1 
                         *  这个时候 还需要while判断是否还有空闲请求 否则继续wait
                         *
                         */
                        while((active = count.getActive()) >= max) {
                            try {
                                //超时时间为 配置的超时时间
                                count.wait(remain);
                            } catch (InterruptedException var32) {
                                ;
                            }
    
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                             //当其他线程通知等待线程执行 判断是否超时 如果超时了则不执行了
                            if (remain <= 0L) {
                                throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max);
                            }
                        }
                    }
                }
            }
    
            boolean var28 = false;
    
            Result var10;
            try {
                var28 = true;
                timeout = System.currentTimeMillis();
                //获得执行权的 count+1
                RpcStatus.beginCount(url, methodName);
    
                try {
                    //执行
                    Result result = invoker.invoke(invocation);
                    //执行完毕关闭
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, true);
                    var10 = result;
                    var28 = false;
                } catch (RuntimeException var31) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, false);
                    throw var31;
                }
            } finally {
                if (var28) {
                    if (max > 0) {
                        //通知等待的线程执行
                        synchronized(count) {
                            count.notify();
                        }
                    }
    
                }
            }
    
            if (max > 0) {
                synchronized(count) {
                    //通知等待的线程执行
                    count.notify();
                }
            }
    
            return var10;
        }
    }

    FutureFilter

    用于处理事件 异步回调 同步回调  异常回调

    例子

    1.自定义一个event接口

    /**
     * @author liqiang
     * @date 2019/11/28 10:34
     * @Description:定义一个抽象接口实现
     */
    public interface IEvent {
        /**
         * 从源码可以看到 传递的这2个参数
         *
         * @param throwable
         * @param args
         */
        public void onThrow(Throwable throwable, Object[] args);
    
        /**
         * 从源码可以看到传递这2个参数
         * async为true表示异步回调
         * async为false 表示调用完成回调
         *
         * @param params
         */
        public void onReturn(Object[] params);
    
        /**
         * 参数列表要跟执行列表一样
         * 这里参数列表不固定 应该抽出去 因为是demo 就写一起
         */
        public void onInvoke(PrizeDrawReqDto prizeDrawReqDto);
    }

    2.实现类

    /**
     * @author liqiang
     * @date 2019/11/28 10:46
     * @Description: 回调实现类
     */
    public class PagePromotionServicePrizeDrawEvent implements IEvent {
        /**
         * 从源码可以看到 传递的这2个参数
         * 发生异常时回调
         * @param throwable
         * @param args
         */
        @Override
        public void onThrow(Throwable throwable, Object[] args) {
            System.out.println(String.format("异常回调,参数:%s",args.toString()));
        }
        /**
         * 从源码可以看到传递这2个参数
         * async为true表示异步回调
         * async为false 表示调用完成回调
         *
         * @param params
         */
        @Override
        public void onReturn(Object[] params) {
          System.out.println(String.format("onReturn回调:",params.toString()));
        }
    
        /**
         * 参数列表要跟执行列表一样
         *执行前回调
         * @param prizeDrawReqDto
         */
        @Override
        public void onInvoke(PrizeDrawReqDto prizeDrawReqDto) {
            System.out.println(String.format("onInvoke执行前回调:",prizeDrawReqDto.toString()));
        }
    }

    2.consumer配置

        <!--定义回调实现类的bean-->
        <bean id="pagePromotionServicePrizeDrawEvent" class="com.bozhi.notify.PagePromotionServicePrizeDrawEvent"></bean>
        <dubbo:reference id="frontendPagePromotionService"
                         url="dubbo://127.0.0.1:23888/com.biz.soa.service.pagepromotion.frontend.PagePromotionService"
                         interface="com.biz.soa.service.pagepromotion.frontend.PagePromotionService" check="false">
            <dubbo:method  async="true"  name="prizeDraw"
                          onreturn="pagePromotionServicePrizeDrawEvent.onReturn"
                          oninvoke="pagePromotionServicePrizeDrawEvent.onInvoke"
                          onthrow="pagePromotionServicePrizeDrawEvent.onThrow"></dubbo:method>
        </dubbo:reference>

    源码

    @Activate(
            group = {"consumer"}//消费端的过滤器
    )
    public class FutureFilter implements Filter {
        protected static final Logger logger = LoggerFactory.getLogger(com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.class);
    
        public FutureFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            //在url后面获取是否是异步
            boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
            //执行调用前时间 onInvoker
            this.fireInvokeCallback(invoker, invocation);
            //执行invoke
            Result result = invoker.invoke(invocation);
            //判断是同步还是异步
            if (isAsync) {
                //处理异步调用回调
                this.asyncCallback(invoker, invocation);
            } else {
                //处理同步调用回调
                this.syncCallback(invoker, invocation, result);
            }
            return result;
        }
    
        private void syncCallback(Invoker<?> invoker, Invocation invocation, Result result) {
            //是否发生异常 如果发生调用
            if (result.hasException()) {
                //调用异常回调
                this.fireThrowCallback(invoker, invocation, result.getException());
            } else {
                //触发onReturn
                this.fireReturnCallback(invoker, invocation, result.getValue());
            }
    
        }
    
        private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
            //获取异步调用的Future
            Future<?> f = RpcContext.getContext().getFuture();
            //判断是否是FutureAdapter适配器
            if (f instanceof FutureAdapter) {
                ResponseFuture future = ((FutureAdapter)f).getFuture();
                //设置异步回调
                future.setCallback(new ResponseCallback() {
                    public void done(Object rpcResult) {
                        //获得返回结果 并不是provider返回值哦
                        if (rpcResult == null) {
                            com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                        } else if (!(rpcResult instanceof Result)) {
                            com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                        } else {
                            Result result = (Result)rpcResult;
                            //如果有异常触发异常回调
                            if (result.hasException()) {
                                //当前类是匿名类  com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this 为匿名类访问上层类对象的实例 就是FutureFilter实例
                                com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireThrowCallback(invoker, invocation, result.getException());
                            } else {
                                //当前类是匿名类  com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this 为匿名类访问上层类对象的实例 就是FutureFilter实例
                                com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireReturnCallback(invoker, invocation, result.getValue());
                            }
    
                        }
                    }
    
                    public void caught(Throwable exception) {
                        //异常回调
                        com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.this.fireThrowCallback(invoker, invocation, exception);
                    }
                });
            }
    
        }
    
        private void fireInvokeCallback(Invoker<?> invoker, Invocation invocation) {
            /**
             * 获取我们配置的oinvoke的方法
             * key为key为com.biz.soa.service.pagepromotion.frontend.PagePromotionService.prizeDraw.oninvoke.method
             */
            Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "oninvoke.method"));
            /**
             * 获取我们配置的oninvoke实例
             * key为com.biz.soa.service.pagepromotion.frontend.PagePromotionService.prizeDraw.oninvoke.instance
             */
            Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "oninvoke.instance"));
            if (onInvokeMethod != null || onInvokeInst != null) {
                if (onInvokeMethod != null && onInvokeInst != null) {
                    if (!onInvokeMethod.isAccessible()) {
                        onInvokeMethod.setAccessible(true);
                    }
    
                    Object[] params = invocation.getArguments();
    
                    try {
                        //反射执行调用
                        onInvokeMethod.invoke(onInvokeInst, params);
                    } catch (InvocationTargetException var7) {
                        //调用失败会触发异常回调
                        this.fireThrowCallback(invoker, invocation, var7.getTargetException());
                    } catch (Throwable var8) {
                        //调用失败会触发异常回调
                        this.fireThrowCallback(invoker, invocation, var8);
                    }
    
                } else {
                    throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
                }
            }
        }
    
        private void fireReturnCallback(Invoker<?> invoker, Invocation invocation, Object result) {
            Method onReturnMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onreturn.method"));
            Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onreturn.instance"));
            if (onReturnMethod != null || onReturnInst != null) {
                if (onReturnMethod != null && onReturnInst != null) {
                    if (!onReturnMethod.isAccessible()) {
                        onReturnMethod.setAccessible(true);
                    }
    
                    Object[] args = invocation.getArguments();
                    Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
                    Object[] params;
                    if (rParaTypes.length > 1) {
                        if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                            params = new Object[]{result, args};
                        } else {
                            params = new Object[args.length + 1];
                            params[0] = result;
                            System.arraycopy(args, 0, params, 1, args.length);
                        }
                    } else {
                        params = new Object[]{result};
                    }
    
                    try {
                        //反射调用
                        onReturnMethod.invoke(onReturnInst, params);
                    } catch (InvocationTargetException var10) {
                        this.fireThrowCallback(invoker, invocation, var10.getTargetException());
                    } catch (Throwable var11) {
                        this.fireThrowCallback(invoker, invocation, var11);
                    }
    
                } else {
                    throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
                }
            }
        }
    
        private void fireThrowCallback(Invoker<?> invoker, Invocation invocation, Throwable exception) {
            Method onthrowMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onthrow.method"));
            Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), "onthrow.instance"));
            if (onthrowMethod != null || onthrowInst != null) {
                if (onthrowMethod != null && onthrowInst != null) {
                    if (!onthrowMethod.isAccessible()) {
                        onthrowMethod.setAccessible(true);
                    }
    
                    Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
                    if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
                        try {
                            Object[] args = invocation.getArguments();
                            Object[] params;
                            if (rParaTypes.length > 1) {
                                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                                    params = new Object[]{exception, args};
                                } else {
                                    params = new Object[args.length + 1];
                                    params[0] = exception;
                                    System.arraycopy(args, 0, params, 1, args.length);
                                }
                            } else {
                                params = new Object[]{exception};
                            }
    
                            onthrowMethod.invoke(onthrowInst, params);
                        } catch (Throwable var9) {
                            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), var9);
                        }
                    } else {
                        logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
                    }
    
                } else {
                    throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
                }
            }
        }
    }

    原理就是 获得我们对应的配置类 反射调用

    DubboInvoker

    但是有个疑惑 就是异步回调怎么实现的那么我们快invoker的实现 DubboInvoker

     protected Result doInvoke(Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation)invocation;
            String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment("path", this.getUrl().getPath());
            inv.setAttachment("version", this.version);
            ExchangeClient currentClient;
            if (this.clients.length == 1) {
                currentClient = this.clients[0];
            } else {
                currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
            }
    
            try {
                //是否是异步
                boolean isAsync = RpcUtils.isAsync(this.getUrl(), invocation);
                //里面取的配置的return 配合sent使用 默认是false   <dubbo:method sent="true" return="true"   async="true"
                boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
                int timeout = this.getUrl().getMethodParameter(methodName, "timeout", 1000);
                //可以看出return优先级大于async
                if (isOneway) {
                    /**
                     *  如果设置了sent=true,表示等待网络数据发出才返回,如果sent=false,只是将待发送数据发到IO写缓存区就返回。
                     */
                    boolean isSent = this.getUrl().getMethodParameter(methodName, "sent", false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture((Future)null);
                    return new RpcResult();
                } else if (isAsync) {
                    //如果是异步则上下文设置一个Future对象并返回
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter(future));
                    return new RpcResult();
                } else {
                    RpcContext.getContext().setFuture((Future)null);
                    //默认实现 Future.get 所以是同步的
                    return (Result)currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException var9) {
                throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var9.getMessage(), var9);
            } catch (RemotingException var10) {
                throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
            }
        }

    Provider

    ContextFilter

    filter链条顶端 主要在当前上下文设置一些基础信息

    @Activate(
            group = {"provider"},
            order = -10000
    )
    public class ContextFilter implements Filter {
        public ContextFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            Map<String, String> attachments = invocation.getAttachments();
            if (attachments != null) {
                //剔除一些参数
                attachments = new HashMap((Map)attachments);
                ((Map)attachments).remove("path");
                ((Map)attachments).remove("group");
                ((Map)attachments).remove("version");
                ((Map)attachments).remove("dubbo");
                ((Map)attachments).remove("token");
                ((Map)attachments).remove("timeout");
                ((Map)attachments).remove("async");
            }
            //从线程缓存获取当前线程的RpcContext 记录一些请求信息
            RpcContext.getContext().setInvoker(invoker).setInvocation(invocation).setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
            if (attachments != null) {
                //将剔除后的attachments设置到上下文
                if (RpcContext.getContext().getAttachments() != null) {
                    RpcContext.getContext().getAttachments().putAll((Map)attachments);
                } else {
                    RpcContext.getContext().setAttachments((Map)attachments);
                }
            }
    
    
            //设置步骤只是设置一层代理 在构造参数从url添加一些信息
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
    
            Result var4;
            try {
                var4 = invoker.invoke(invocation);
            } finally {
                //线程缓存清除
                RpcContext.removeContext();
            }
    
            return var4;
        }
    }

    EchoFilter

    回响测试主要用来检测服务是否正常(网络状态),单纯的检测网络情况的话其实不需要执行真正的业务逻辑的,所以通过Filter验证一下即可。

    官方文档

    consumer生成代理 强制实现了EchoService 我们强转为EchoService就能调用测试服务是否可用

    /**
     * 回声测试 用来校验服务是否可用 并不执行具体逻辑
     */
    @Activate(
            group = {"provider"},
            order = -110000
    )
    public class EchoFilter implements Filter {
        public EchoFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
            /**
             * 如果是回声测试 则只返回入参
             */
            return (Result)(inv.getMethodName().equals("$echo")
                    && inv.getArguments() != null
                    && inv.getArguments().length == 1 ? new RpcResult(inv.getArguments()[0]) : invoker.invoke(inv));
        }
    }

    ExecuteLimitFilter

    例子

    1.解决注解配置报错问题

    参考:https://blog.csdn.net/xiao_jun_0820/article/details/81218440

     /**
         * 解决@Service注解配置parameters参数时无法将String[]转化成Map<String,String>的bug
         * @author :
         * @since*/
        @Component
        public static class ServiceParameterBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter implements PriorityOrdered {
    
            @Override
            public int getOrder() {
                return PriorityOrdered.LOWEST_PRECEDENCE;
            }
    
            @Override
            public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeansException {
    //        pvs.getPropertyValue("parameter")
                if (bean instanceof ServiceBean) {
                    PropertyValue propertyValue = pvs.getPropertyValue("parameters");
                    ConversionService conversionService = getConversionService();
    
                    if (propertyValue != null && propertyValue.getValue() != null && conversionService.canConvert(propertyValue.getValue().getClass(), Map.class)) {
                        Map parameters = conversionService.convert(propertyValue.getValue(), Map.class);
                        propertyValue.setConvertedValue(parameters);
                    }
                }
                return pvs;
            }
    
            private ConversionService getConversionService() {
                DefaultConversionService conversionService = new DefaultConversionService();
                conversionService.addConverter(new StringArrayToStringConverter());
                conversionService.addConverter(new StringArrayToMapConverter());
                return conversionService;
            }
    
        }

    2.限流配置

    /**
     * @author liqiang
     * @date 2019/10/15 19:33
     * @Description:封装暴露前端的 业务处理
     */
    @Service("frontPagePromotionService")
    @com.alibaba.dubbo.config.annotation.Service(
             parameters = {"prizeDraw.executes","1"}
    )
    public class PagePromotionServiceImpl extends AbstractBaseService implements PagePromotionService {
        /**
         * 用户根据活动抽奖
         *
         * @return
         */
        @Transactional(rollbackFor = Exception.class)
        public PrizeDrawResDto prizeDraw(PrizeDrawReqDto prizeDrawReqDto) throws Exception {
            ...
        }
    }

    源码

    /**
     * 服务提供者限流
     */
    @Activate(
            group = {"provider"},
            value = {"executes"}
    )
    public class ExecuteLimitFilter implements Filter {
        public ExecuteLimitFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            Semaphore executesLimit = null;
            boolean acquireResult = false;
            //获取我们配置的executes
            int max = url.getMethodParameter(methodName, "executes", 0);
            if (max > 0) {
                //获取当前请求的RpcStatus
                RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
                //获得信号量
                executesLimit = count.getSemaphore(max);
                //如果达到限流条件直接报错
                if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                    throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes="" + max + "" /> limited.");
                }
            }
    
            long begin = System.currentTimeMillis();
            boolean isSuccess = true;
            RpcStatus.beginCount(url, methodName);
    
            Result var12;
            try {
                Result result = invoker.invoke(invocation);
                var12 = result;
            } catch (Throwable var16) {
                isSuccess = false;
                if (var16 instanceof RuntimeException) {
                    throw (RuntimeException)var16;
                }
    
                throw new RpcException("unexpected exception when ExecuteLimitFilter", var16);
            } finally {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
                if (acquireResult) {
                    //请求完成释放限流
                    executesLimit.release();
                }
    
            }
    
            return var12;
        }
    }

    ExceptionFilter

    异常抛出规则

    如果是 checked异常 则直接抛出;
    如果是unchecked异常 但是在接口上有声明,也会直接抛出;
    如果异常类和接口类在同一jar包里,直接抛出;
    如果是 JDK自带的异常 ,直接抛出;
    如果是 Dubbo的异常 ,直接抛出;
    其余的都包装成RuntimeException然后抛出(避免异常在Client不能反序列化问题);

    源码

    @Activate(
            group = {"provider"}
    )
    public class ExceptionFilter implements Filter {
        private final Logger logger;
    
        public ExceptionFilter() {
            this(LoggerFactory.getLogger(com.alibaba.dubbo.rpc.filter.ExceptionFilter.class));
        }
    
        public ExceptionFilter(Logger logger) {
            this.logger = logger;
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            try {
                //执行invoke
                Result result = invoker.invoke(invocation);
                //判断是否有异常
                if (result.hasException() && GenericService.class != invoker.getInterface()) {
                    try {
                        Throwable exception = result.getException();
                        //如果不是runtime异常直接返回
                        if (!(exception instanceof RuntimeException) && exception instanceof Exception) {
                            return result;
                        } else {
                            try {
                                Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                                //获得方法上面声明的异常集合
                                Class<?>[] exceptionClassses = method.getExceptionTypes();
                                Class[] arr$ = exceptionClassses;
                                int len$ = exceptionClassses.length;
    
                                for(int i$ = 0; i$ < len$; ++i$) {
                                    Class<?> exceptionClass = arr$[i$];
                                    //如果异常等于声明的异常 直接返回 注意是equals
                                    if (exception.getClass().equals(exceptionClass)) {
                                        return result;
                                    }
                                }
                            } catch (NoSuchMethodException var11) {
                                return result;
                            }
                            //找到这里表示并不是声明的异常 比如声明thow Exception 抛出空指针异常  才会到这里
                            this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
                            String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                            String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                            //如果是其他jar包的异常 则包装成Runtime异常 抛出 避免客户端序列化失败问题
                            if (serviceFile != null && exceptionFile != null && !serviceFile.equals(exceptionFile)) {
                                String className = exception.getClass().getName();
                                if (!className.startsWith("java.") && !className.startsWith("javax.")) {
                                    return (Result)(exception instanceof RpcException ? result : new RpcResult(new RuntimeException(StringUtils.toString(exception))));
                                } else {
                                    return result;
                                }
                            } else {
                                return result;
                            }
                        }
                    } catch (Throwable var12) {
                        this.logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var12.getClass().getName() + ": " + var12.getMessage(), var12);
                        return result;
                    }
                } else {
                    return result;
                }
            } catch (RuntimeException var13) {
                this.logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost() + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", exception: " + var13.getClass().getName() + ": " + var13.getMessage(), var13);
                throw var13;
            }
        }
    }
  • 相关阅读:
    mysql 查看表注解
    oracle 相关
    sql version control
    ccna
    msql 清库
    mybatisplus,application.properties 配置数据库密码加密
    驱动开发print无法输出问题
    bochs帮助
    以虚御虚用虚拟机调试vt程式
    ssm整合
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12502733.html
Copyright © 2011-2022 走看看