zoukankan      html  css  js  c++  java
  • dubbo源码阅读-Filter默认实现(十一)之ActiveLimitFilter&ExecuteLimitFilter

    文档

    http://dubbo.apache.org/zh-cn/docs/user/demos/concurrency-control.html

    ActiveLimitFilter

    /**
     * 控制调用服务的并发量 限流
     * 同时支持多少请求 consumer有效 含有actives配置
     */
    @Activate(
            group = {"consumer"},
            value = {"actives"}
    )
    public class ActiveLimitFilter implements Filter {
        public ActiveLimitFilter() {
        }
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            //获得 <dubbo:reference actives="1"> actives的数量
            int max = invoker.getUrl().getMethodParameter(methodName, "actives", 0);
            //获取当前service当前方法的请求数量
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            long timeout;
            //配置并发控制大于0才写
            if (max > 0) {
                //获得当前方法的等待时间
                timeout = (long)invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                //判断是否大于并发数 如果大于则等待
                int active = count.getActive();
                if (active >= max) {
                    synchronized(count) {
                        /**
                         *1.while循环是有必要的
                         *  当收到其他线程notify 获得执行权
                         *  但是这个时候其他线程提前进入(active >= max)  判断为false获得执行权 count+1 
                         *  这个时候 还需要while判断是否还有空闲请求 否则继续wait
                         *
                         */
                        while((active = count.getActive()) >= max) {
                            try {
                                //超时时间为 配置的超时时间
                                count.wait(remain);
                            } catch (InterruptedException var32) {
                                ;
                            }
    
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                             //当其他线程通知等待线程执行 判断是否超时 如果超时了则不执行了
                            if (remain <= 0L) {
                                throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max);
                            }
                        }
                    }
                }
            }
    
            boolean var28 = false;
    
            Result var10;
            try {
                var28 = true;
                timeout = System.currentTimeMillis();
                //获得执行权的 count+1
                RpcStatus.beginCount(url, methodName);
    
                try {
                    //执行
                    Result result = invoker.invoke(invocation);
                    //执行完毕关闭
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, true);
                    var10 = result;
                    var28 = false;
                } catch (RuntimeException var31) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - timeout, false);
                    throw var31;
                }
            } finally {
                if (var28) {
                    if (max > 0) {
                        //通知等待的线程执行
                        synchronized(count) {
                            count.notify();
                        }
                    }
    
                }
            }
    
            if (max > 0) {
                synchronized(count) {
                    //通知等待的线程执行
                    count.notify();
                }
            }
    
            return var10;
        }
    }

    ExecuteLimitFilter

    **
     * ThreadLimitInvokerFilter
     * 只针对provider有效 含有executes配置
     */
    @Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
    public class ExecuteLimitFilter implements Filter {
    
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            Semaphore executesLimit = null;
            boolean acquireResult = false;
            //获取服务提供者的最大访问并发数配置
            int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
            if (max > 0) {
                //<1> 获得 RpcStatus 对象,基于服务 URL + 方法维度
                RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
    //            if (count.getActive() >= max) {
                /**
                 * http://manzhizhen.iteye.com/blog/2386408
                 * use semaphore for concurrency control (to limit thread number)
                 */
                //获得信号量
                executesLimit = count.getSemaphore(max);
                //    //如果达到限流条件直接报错
                if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                    throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes="" + max + "" /> limited.");
                }
            }
            long begin = System.currentTimeMillis();
            boolean isSuccess = true;
            //<2>调用计数+1
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (Throwable t) {
                isSuccess = false;
                if (t instanceof RuntimeException) {
                    throw (RuntimeException) t;
                } else {
                    throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
                }
            } finally {
                //<3>调用结束的计数-1
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
                //<4>释放信号量
                if(acquireResult) {
                    executesLimit.release();
                }
            }
        }
    
    }
  • 相关阅读:
    JUC原子类 1
    线程优先级和守护线程
    多线程中断
    关于html5不支持frameset的解决方法
    shell中$0,$?,$!等的特殊用法
    Linux GCC常用命令
    C/C++中extern关键字详解
    js实现iframe自适应高度
    java线程安全总结
    Linux平台Java调用so库-JNI使用例子
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12505088.html
Copyright © 2011-2022 走看看