zoukankan      html  css  js  c++  java
  • Motan在服务provider端用于处理request的线程池

    最近开始重新看motan的源码,打算花一年的时间来分析每个模块每个功能的代码实现,坚持写一个motan分析系列。

    因为没有思路,只能一个片段一个片段的看,等到有了一定的积累,再将看过的代码串起来一起分析,形成完整的思路。

    第一篇是要回答自己的一个疑问,motan在服务provider端的线程模型是什么?request到达服务provider端之后,被哪个线程处理?

    motan的tcp通信框架用的是netty,netty的线程模型是reactor模型。由一个Acceptor线程负责channel的接入,然后交给reactor线程来负责这个channel的读写事件。

    所以一个request到达provider端,首先是由reactor线程来处理,进行解码,解码成java的request对象。这个时候就有一个问题,一般我们的服务都是要访问数据库等资源的,会存在IO的阻塞,如果我们直接在reactor线程处理request请求,就会阻塞住reactor线程,使得reactor无法处理其他channel的读写事件,也就无法达到高的并发。因此我们使用一个线程池来处理request。

    motan的代码:

    public class NettyChannelHandler extends SimpleChannelHandler {
    	private ThreadPoolExecutor threadPoolExecutor;
    	......
    	private void processRequest(final ChannelHandlerContext ctx, MessageEvent e) {
    		final Request request = (Request) e.getMessage();
    		request.setAttachment(URLParamType.host.getName(), NetUtils.getHostName(ctx.getChannel().getRemoteAddress()));
    
    		final long processStartTime = System.currentTimeMillis();
    
    		// 使用线程池方式处理
    		try {
    			threadPoolExecutor.execute(new Runnable() {
    				@Override
                    public void run() {
                        processRequest(ctx, request, processStartTime);
                    }
                });
    		} catch (RejectedExecutionException rejectException) {
    			DefaultResponse response = new DefaultResponse();
    			response.setRequestId(request.getRequestId());
    			response.setException(new MotanServiceException("process thread pool is full, reject",
    					MotanErrorMsgConstant.SERVICE_REJECT));
    			response.setProcessTime(System.currentTimeMillis() - processStartTime);
    			e.getChannel().write(response);
    
    			LoggerUtil
    					.debug("process thread pool is full, reject, active={} poolSize={} corePoolSize={} maxPoolSize={} taskCount={} requestId={}",
    							threadPoolExecutor.getActiveCount(), threadPoolExecutor.getPoolSize(),
    							threadPoolExecutor.getCorePoolSize(), threadPoolExecutor.getMaximumPoolSize(),
    							threadPoolExecutor.getTaskCount(), request.getRequestId());
    		}
    	}
    

      它的流程是这样:

     Motan在处理request的时候并没有直接使用JDK的线程池,而是继承ThreadPoolExecutor进行了自定义实现。

     StandardThreadExecutor 的实现代码:

    public class StandardThreadExecutor extends ThreadPoolExecutor {
    
    	public static final int DEFAULT_MIN_THREADS = 20;
    	public static final int DEFAULT_MAX_THREADS = 200;
    	public static final int DEFAULT_MAX_IDLE_TIME = 60 * 1000; // 1 minutes
    
    	protected AtomicInteger submittedTasksCount;	// 正在处理的任务数 
    	private int maxSubmittedTaskCount;				// 最大允许同时处理的任务数
    	.......
    
    	public StandardThreadExecutor(int coreThreads, int maxThreads, long keepAliveTime, TimeUnit unit,
    			int queueCapacity, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    		super(coreThreads, maxThreads, keepAliveTime, unit, new ExecutorQueue(), threadFactory, handler);
    		((ExecutorQueue) getQueue()).setStandardThreadExecutor(this);
    
    		submittedTasksCount = new AtomicInteger(0);
    		
    		// 最大并发任务限制: 队列buffer数 + 最大线程数 
    		maxSubmittedTaskCount = queueCapacity + maxThreads; 
    	}
    
    	public void execute(Runnable command) {
    		int count = submittedTasksCount.incrementAndGet();
    
    		// 超过最大的并发任务限制,进行 reject
    		// 依赖的LinkedTransferQueue没有长度限制,因此这里进行控制 
    		if (count > maxSubmittedTaskCount) {
    			submittedTasksCount.decrementAndGet();
    			getRejectedExecutionHandler().rejectedExecution(command, this);
    		}
    
    		try {
    			super.execute(command);
    		} catch (RejectedExecutionException rx) {
    			// there could have been contention around the queue
    			if (!((ExecutorQueue) getQueue()).force(command)) {
    				submittedTasksCount.decrementAndGet();
    
    				getRejectedExecutionHandler().rejectedExecution(command, this);
    			}
    		}
    	}
    
    	public int getSubmittedTasksCount() {
    		return this.submittedTasksCount.get();
    	}
    	
    	public int getMaxSubmittedTaskCount() {
    		return maxSubmittedTaskCount;
    	}
    
    	protected void afterExecute(Runnable r, Throwable t) {
    		submittedTasksCount.decrementAndGet();
    	}
    }
    

      这里是重写了ThreadPoolExecutor的execute和afterExecute方法。一个需要非常注意的地方是使用了一个ExecutorQueue作为BlockingQueue.

         我们来看ExecutorQueue的代码实现:

    /**
     * LinkedTransferQueue 能保证更高性能,相比与LinkedBlockingQueue有明显提升 
     * 
     * 		1) 不过LinkedTransferQueue的缺点是没有队列长度控制,需要在外层协助控制
    */
    class ExecutorQueue extends LinkedTransferQueue<Runnable> {
    	private static final long serialVersionUID = -265236426751004839L;
    	StandardThreadExecutor threadPoolExecutor;
    
    	public ExecutorQueue() {
    		super();
    	}
    
    	public void setStandardThreadExecutor(StandardThreadExecutor threadPoolExecutor) {
    		this.threadPoolExecutor = threadPoolExecutor;
    	}
    
    	// 注:代码来源于 tomcat 
    	public boolean force(Runnable o) {
    		if (threadPoolExecutor.isShutdown()) {
    			throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
    		}
    		// forces the item onto the queue, to be used if the task is rejected
    		return super.offer(o);
    	}
    
    	// 注:tomcat的代码进行一些小变更 
    	public boolean offer(Runnable o) {
    		int poolSize = threadPoolExecutor.getPoolSize();
    
    		// we are maxed out on threads, simply queue the object
    		if (poolSize == threadPoolExecutor.getMaximumPoolSize()) {
    			return super.offer(o);
    		}
    		// we have idle threads, just add it to the queue
    		// note that we don't use getActiveCount(), see BZ 49730
    		if (threadPoolExecutor.getSubmittedTasksCount() <= poolSize) {
    			return super.offer(o);
    		}
    		// if we have less threads than maximum force creation of a new
    		// thread
    		if (poolSize < threadPoolExecutor.getMaximumPoolSize()) {
    			return false;
    		}
    		// if we reached here, we need to add it to the queue
    		return super.offer(o);
    	}
    }
    

      ExecutorQueue实现了LinkedTransferQueue,主要是LinkedTransferQueue相比LinkedBlockingQueue等的队列有很大的性能提高。它的缺点是没有队列长度控制,容易发生内存溢出。所以motan的代码中,在execute(Runnable r)中对提交的任务数加一,在afterExecute中对提交的任务数减一,维护了一个正在运行的任务数,同时有一个最大任务数的限制。在提交任务的时候,如果任务数超过了最大任务数,对这个任务执行拒绝策略,以此实现了队列长度的控制。

          另外在ExecutorQueue的实现中重写了offer方法,因为没有长度限制的LinkedTransferQueue的offer方法总是返回true,所以线程池中的线程数不会超过minThread。Motan的改动是在提交的任务数超过poolSize,而poolSize小于最大线程数的时候返回false,让executor创建线程。

          最后用两张图来总结JDK的线程池与Motan线程池的执行流程:

  • 相关阅读:
    使用javap分析Java的字符串操作
    使用javap深入理解Java整型常量和整型变量的区别
    分享一个WebGL开发的网站-用JavaScript + WebGL开发3D模型
    Java动态代理之InvocationHandler最简单的入门教程
    Java实现 LeetCode 542 01 矩阵(暴力大法,正反便利)
    Java实现 LeetCode 542 01 矩阵(暴力大法,正反便利)
    Java实现 LeetCode 542 01 矩阵(暴力大法,正反便利)
    Java实现 LeetCode 541 反转字符串 II(暴力大法)
    Java实现 LeetCode 541 反转字符串 II(暴力大法)
    Java实现 LeetCode 541 反转字符串 II(暴力大法)
  • 原文地址:https://www.cnblogs.com/huangll99/p/6661235.html
Copyright © 2011-2022 走看看