zoukankan      html  css  js  c++  java
  • spring控制并发数的工具类ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptor

    在ConcurrencyThrottleSupport类中,简单的通过synchronized和wati and notify达到控制线程数量的效果,从而实现限流的策略。

    一、类图

    二、主要方法

    先看ConcurrencyThrottleInterceptor.java类的源码:

    看该拦截器中的invoke()方法中,在执行目标方法的前后分别执行beforeAccess()和 afterAccess()方法,

    • beforeAccess方法中通过内部计数器concurrencyCount来对比设置的阀值concurrencyLimit,如果超过设置值,则阻塞。若没有超过设置值,则concurrencyCount自加。
    • afterAccess方法中自减concurrencyCount
    public class ConcurrencyThrottleInterceptor extends ConcurrencyThrottleSupport
            implements MethodInterceptor, Serializable {
    
        public ConcurrencyThrottleInterceptor() {
            setConcurrencyLimit(1);
        }
    
        @Override
        public Object invoke(MethodInvocation methodInvocation) throws Throwable {
            beforeAccess();
            try {
                return methodInvocation.proceed();
            }
            finally {
                afterAccess();
            }
        }
    
    }

    beforeAccess()实现(在父类ConcurrencyThrottleSupport中实现)

        protected void beforeAccess() {
            if (this.concurrencyLimit == NO_CONCURRENCY) {
                throw new IllegalStateException(
                        "Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
            }
            if (this.concurrencyLimit > 0) {
                boolean debug = logger.isDebugEnabled();
                synchronized (this.monitor) {
                    boolean interrupted = false;
                    while (this.concurrencyCount >= this.concurrencyLimit) {
                        if (interrupted) {
                            throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
                                    "but concurrency limit still does not allow for entering");
                        }
                        if (debug) {
                            logger.debug("Concurrency count " + this.concurrencyCount +
                                    " has reached limit " + this.concurrencyLimit + " - blocking");
                        }
                        try {
                            this.monitor.wait();
                        }
                        catch (InterruptedException ex) {
                            // Re-interrupt current thread, to allow other threads to react.
                            Thread.currentThread().interrupt();
                            interrupted = true;
                        }
                    }
                    if (debug) {
                        logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
                    }
                    this.concurrencyCount++;
                }
            }
        }

     beforeAccess()实现(在父类ConcurrencyThrottleSupport中实现)

        protected void afterAccess() {
            if (this.concurrencyLimit >= 0) {
                synchronized (this.monitor) {
                    this.concurrencyCount--;
                    if (logger.isDebugEnabled()) {
                        logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
                    }
                    this.monitor.notify();
                }
            }
        }

     使用场景见《spring异步线程池-SimpleAsyncTaskExecutor

  • 相关阅读:
    C# 课堂总结2-数据类型及转换方式
    C# 课堂总结1-二进制转换
    C++
    C++ 程序设计语言
    VS编译器问题总结
    go 笔记
    SIP协议 会话发起协议(二)
    SIP协议 会话发起协议(一)
    201707 一些好的文章
    编程拾穗
  • 原文地址:https://www.cnblogs.com/duanxz/p/9435873.html
Copyright © 2011-2022 走看看