zoukankan      html  css  js  c++  java
  • JAVA WEB项目中开启流量控制Filter

    Flow Control:控流的概念
    • 主要是用来限定server所能承载的最大(高并发)流量峰值,以免在峰值是Server过载而宕机,对于WEB系统而言
    • 通常是分布式部署,如果请求并发量很大,会导致整个集群崩溃,也就是通常所说的“雪崩效应”。
    • 所以,我们不仅在网络代理层面(比如nginx)设置流量控制以抵抗、拒止溢出流量,
    • 我们还应该在application server层面有一定的自我保护策略,确保当前JVM的负载应该在可控范围之内,对于JVM承载能力之外的请求,应该被合理管理。

    本文开发了一个分布式流量控制Filter,来限定application的并发量:

    
    

        1)对于过量的请求,首先将请求buffer在队列中。

    
    

        2)当buffer队列满时,多余的请求将会被直接拒绝。(过载请求量)

    
    

        3)那些buffer中被阻塞的请求,等待一定时间后任然无法被执行,则直接返回错误URL。(溢出请求量)

    
    

        4)我们设定一个允许的并发量,通过java中Semaphore控制。只有获取“锁”的请求,才能继续执行。




    web.xml配置

    <
    filter> <filter-name>flowControlFilter</filter-name> <filter-class>com.demo.security.FlowControlFilter</filter-class> <init-param> <param-name>permits</param-name> <param-value>128</param-value> </init-param> <init-param> <param-name>timeout</param-name> <param-value>15000</param-value> </init-param> <init-param> <param-name>bufferSize</param-name> <param-value>500</param-value> </init-param> <init-param> <param-name>errorUrl</param-name> <param-value>/error.html</param-value> </init-param> </filter>

    <filter-mapping>  
        <filter-name>flowControlFilter</filter-name>  
        <url-pattern>/*</url-pattern>  
    </filter-mapping>  

    Java代码:

    package com.src.java.filter;
    
    import java.io.IOException;
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
    import javax.servlet.Filter;
    import javax.servlet.FilterChain;
    import javax.servlet.FilterConfig;
    import javax.servlet.ServletException;
    import javax.servlet.ServletRequest;
    import javax.servlet.ServletResponse;
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * 
     * @ClassName: FlowControlFilter
     * @Description: 分布式系统流量控制
     * @author chinasoft_liuhanlin
     * @date 2017年6月1日 下午3:57:08
     */
    public class FlowControlFilter implements Filter {
    
        /**
         * 最大并发量 默认为500
         */
        private int permits = Runtime.getRuntime().availableProcessors() + 1;
    
        /**
         * 当并发量达到permits后,新的请求将会被buffer,buffer最大尺寸 如果buffer已满,则直接拒绝
         */
        private int bufferSize = 500;
        /**
         * buffer中的请求被阻塞,此值用于控制最大阻塞时间 默认阻塞时间
         */
        private long timeout = 30000;
        /**
         * 跳转的错误页面
         */
        private String errorUrl;
    
        private BlockingQueue<Node> waitingQueue;
    
        private Thread selectorThread;
        private Semaphore semaphore;
    
        private Object lock = new Object();
    
        @Override
        public void destroy() {
    
        }
    
        /**
         * <p>
         * Title: doFilter
         * </p>
         * <p>
         * Description:
         * </p>
         * 
         * @param request
         * @param response
         * @param chain
         * @throws IOException
         * @throws ServletException
         * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest,
         *      javax.servlet.ServletResponse, javax.servlet.FilterChain)
         */
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
            checkSelector();
            Thread t = Thread.currentThread();
            HttpServletResponse httpServletResponse = (HttpServletResponse) response;
    
            Node node = new Node(t, false);
            boolean buffered = waitingQueue.offer(node);
            // 如果buffer已满
            if (!buffered) {
                if (errorUrl != null) {
                    httpServletResponse.sendRedirect(errorUrl);
                }
                return;
            }
            long deadline = System.currentTimeMillis() + timeout;
            // 进入等待队列后,当前线程阻塞
            LockSupport.parkNanos(this, TimeUnit.MICROSECONDS.toNanos(timeout));
            if (t.isInterrupted()) {
                // 如果线程是中断返回
                t.interrupted();// clear status
    
            }
            // 如果等待过期,则直接返回
            if (deadline >= System.currentTimeMillis()) {
                if (errorUrl != null) {
                    httpServletResponse.sendRedirect(errorUrl);
                }
                // 对信号量进行补充
                synchronized (lock) {
                    if (node.dequeued) {
                        semaphore.release();
                    } else {
                        node.dequeued = true;
                    }
                }
                return;
            }
            // 继续执行
            try {
                chain.doFilter(request, response);
            } finally {
                semaphore.release();
                checkSelector();
            }
        }
    
        /**
         * <p>
         * Title: init
         * </p>
         * <p>
         * Description:
         * </p>
         * 
         * @param filterConfig
         * @throws ServletException
         * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
         */
        @Override
        public void init(FilterConfig filterConfig) throws ServletException {
            String p = filterConfig.getInitParameter("permits");
            if (p != null) {
                permits = Integer.parseInt(p);
                if (permits < 0) {
                    throw new IllegalArgumentException("FlowControlFilter,permits parameter should be greater than 0 !");
                }
            }
    
            String t = filterConfig.getInitParameter("timeout");
            if (t != null) {
                timeout = Long.parseLong(t);
                if (timeout < 1) {
                    throw new IllegalArgumentException("FlowControlFilter,timeout parameter should be greater than 0 !");
                }
            }
    
            String b = filterConfig.getInitParameter("bufferSize");
            if (b != null) {
                bufferSize = Integer.parseInt(b);
                if (bufferSize < 0) {
                    throw new IllegalArgumentException("FlowControlFilter,bufferSize parameter should be greater than 0 !");
                }
            }
    
            errorUrl = filterConfig.getInitParameter("errorUrl");
    
            waitingQueue = new LinkedBlockingQueue<>(bufferSize);
            semaphore = new Semaphore(permits);
    
            selectorThread = new Thread(new SelectorRunner());
            selectorThread.setDaemon(true);
            selectorThread.start();
    
        }
    
        /**
         * @Title: checkSelector
         * @Description: TODO
         * @param:
         * @return: void
         * @throws
         */
        private void checkSelector() {
            if (selectorThread != null && selectorThread.isAlive()) {
                return;
            }
            synchronized (lock) {
                if (selectorThread != null && selectorThread.isAlive()) {
                    return;
                }
                selectorThread = new Thread(new SelectorRunner());
                selectorThread.setDaemon(true);
                selectorThread.start();
            }
        }
    
        /**
         * 
         * @ClassName: SelectorRunner
         * @Description: TODO
         * @author chinasoft_liuhanlin
         * @date 2017年6月1日 下午3:59:11
         */
        private class SelectorRunner implements Runnable {
    
            @Override
            public void run() {
                try {
                    while (true) {
                        Node node = waitingQueue.take();
                        // 如果t,阻塞逃逸,只能在pack超时后退出
                        synchronized (lock) {
                            if (node.dequeued) {
                                // 如果此线程已经park过期而退出了,则直接忽略
                                continue;
                            } else {
                                node.dequeued = true;
                            }
    
                        }
                        semaphore.acquire();
                        LockSupport.unpark(node.currentThread);
                    }
                } catch (Exception e) {
                    //
                } finally {
                    // 全部释放阻塞
                    Queue<Node> queue = new LinkedList<>();
                    waitingQueue.drainTo(queue);
                    for (Node n : queue) {
                        if (!n.dequeued) {
                            LockSupport.unpark(n.currentThread);
                        }
                    }
                }
            }
        }
    
        private class Node {
            Thread currentThread;
            boolean dequeued;// 是否已经出队
    
            public Node(Thread t, boolean dequeued) {
                this.currentThread = t;
                this.dequeued = dequeued;
            }
        }
    }
  • 相关阅读:
    HTML标签(2)
    HTML简介(1)
    JqueryUI input 自动提示 autocomplete
    Linux基础--单ubuntu 系统 u盘启动 install
    Spark Parquet file split
    HashMap与ConcurrentHashMap
    线程池阻塞队列之ArrayBlockingQueue
    线程池阻塞队列之LinkedBlockingQueue
    线程池的拒绝策略
    关闭线程池shutdown 和 shutdownNow 的区别
  • 原文地址:https://www.cnblogs.com/lhl-shubiao/p/6929342.html
Copyright © 2011-2022 走看看