想聊一聊流量控制,谈谈的重要性,解决了哪些业务问题,那我们问题来进入正题。
1、WEB容器如何流量控制?
一个Tomcat的容器,这个容器呢,部署在一台服务器上面,同时这台服务器的资源非常非常有限,这台服务器只能同时让500个请求访问,若是多余500个的话,这样服务器的资源就会打满,那么我们肯定需要想办法这些问题的。Tomcat本身就有这样的机制,因为每一个请求过来后,tomcat会为这个请求分配一个处理线程,所以tomcat就是来控制处理线程的数量。
server.xml
<Connector executor="tomcatThreadPool" port="8080" protocol="HTTP/1.1" connectionTimeout="8000" enableLookups="false" acceptorThreadCount="1" URIEncoding="utf-8" redirectPort="443" compression="on" compressionMinSize="1024" compressableMimeType="text/html,text/xml,text/javascript,text/css,text/plain,application/json,application/xml" /> <Executor className="StandardThreadExecutor" name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="500" minSpareThreads="100"/>
maxThreads="500" 表示最多能同时并存500个处理线程。
acceptCount="500" 表示在500个处理线程在占用的情况中,还允许500个请求的排队。
这两个参数基本就是Tomcat在线程保护当中的策略。
2、一个WEB容器里面如何进行具体的业务模块的线程保护呢?
一个业务系统部署在一个Tomcat中,例如这个业务系统有两个重要模块(A和B模块),这个两个模块的请求都需要有资源处理,而不是那一个模块把系统的资源都占用去,例如:A模块限制最多300请求,B模块最多300个请求。这样场景的出现时,我们就需要考虑说A模块最多只能有300个处理线程,B也是这样,那么Tomcat是可以保证资源层面的,A+B共有500个,而无法确保A/B各300个,所以有如下想法:
1、每个请求进来确定是属于A还是属于B。
2、当前正在运行的A/B模块的数量。
基于上面想法的具体实现:
流量控制的业务实现(TrafficControl.java):
/** * 简单的实现基于URL的流控 */ public class TrafficControl { //一个url请求的最大访问数量为300 private final static int ONE_URI_MAX_CONCURRENT = 300; //所有url请求的最大访问数量为500 private final static int ALL_URI_MAX_CONCURRENT = 500; private final static AtomicInteger all_url_concurrent = new AtomicInteger(0); private final static ConcurrentMap<String, AtomicInteger> url_concurrent_map = new ConcurrentHashMap<String, AtomicInteger>(); private final static SwitcherManager switcherManager = SwitcherManagerFactoryLoader.getSwitcherManagerFactory().getSwitcherManager(); private final static Switcher tcEnabled = switcherManager.registerSwitcher("feature.trackurl.traffic_control.enable", true); public static boolean isDisabled() { return tcEnabled.isClose(); } public static boolean isOverflow(String uri) { if (all_url_concurrent.get() > ALL_URI_MAX_CONCURRENT) { return true; } AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null && one_url_concurrent.get() > ONE_URI_MAX_CONCURRENT) { return true; } return false; } public static void startAccess(String uri) { all_url_concurrent.incrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.incrementAndGet(); } else { url_concurrent_map.putIfAbsent(uri, new AtomicInteger(1)); } } public static void endAccess(String uri) { all_url_concurrent.decrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.decrementAndGet(); } } public static void dumpWarnLog() { String lineSeparator = System.getProperty("line.separator"); // 估算每一个URL和其计数占用32个字符 StringBuilder sb = new StringBuilder((1 + url_concurrent_map.size()) * 32); sb.append("all_url_concurrent : ").append(all_url_concurrent); for (Map.Entry<String, AtomicInteger> entry : url_concurrent_map.entrySet()) { sb.append(lineSeparator).append(' ').append(entry.getKey()).append(" : ").append(entry.getValue()); } ApiLogger.warn(sb); } }
Servlet的实现(TrafficControlServlet.java):
/** * 带有流量控制的Servlet */ public class TrafficControlServlet extends HttpServlet { @Override protected final void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (TrafficControl.isDisabled()) { super.service(req, resp); return; } String uri = req.getRequestURI(); if (TrafficControl.isOverflow(uri)) { TrafficControl.dumpWarnLog(); resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } try { TrafficControl.startAccess(uri); super.service(req, resp); } finally { TrafficControl.endAccess(uri); } } }
最后执行Servlet如下,继承于TrafficControlServlet。
public class TestServlet extends TrafficControlServlet { private static final long serialVersionUID = 2895590140869067830L; @Override protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException { //................. } @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException { doGet(request, response); }