一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段:
private ThreadPoolExecutor threadPoolExecutor;
可以发现,spring的 ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现,
直接看代码:
@Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
这是ThreadPoolTaskExecutor用来初始化threadPoolExecutor的方法,BlockingQueue是一个阻塞队列,这个我们先不管。由于ThreadPoolTaskExecutor的实现方式完全是使用threadPoolExecutor进行实现,我们需要知道这个threadPoolExecutor的一些参数。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
这个是调用的构造函数:
int corePoolSize:线程池维护线程的最小数量.
int maximumPoolSize:线程池维护线程的最大数量.
long keepAliveTime:空闲线程的存活时间.
TimeUnit unit: 时间单位,现有纳秒,微秒,毫秒,秒枚举值.
BlockingQueue<Runnable> workQueue:持有等待执行的任务队列.
RejectedExecutionHandler handler:
用来拒绝一个任务的执行,有两种情况会发生这种情况。
一是在execute方法中若addIfUnderMaximumPoolSize(command)为false,即线程池已经饱和;
二是在execute方法中, 发现runState!=RUNNING || poolSize == 0,即已经shutdown,就调用ensureQueuedTaskHandled(Runnable command),在该方法中有可能调用reject。
ThreadPoolExecutor池子的处理流程如下:
1)当池子大小小于corePoolSize就新建线程,并处理请求
2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理
3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理
4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁
其会优先创建 CorePoolSiz 线程, 当继续增加线程时,先放入Queue中,当 CorePoolSiz 和 Queue 都满的时候,就增加创建新线程,当线程达到MaxPoolSize的时候,就会抛出错 误 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常。
这个是ThreadPoolExecutor的运算流程,既然ThreadPoolTaskExecutor是直接使用ThreadPoolExecutor进行处理,所以运算规则肯定一样。
在spring中使用ThreadPoolTaskExecutor的配置:
<!-- 异步线程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心线程数 --> <property name="corePoolSize" value="3" /> <!-- 最大线程数 --> <property name="maxPoolSize" value="10" /> <!-- 队列最大长度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="25" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="300" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
Reject策略预定义有四种:
(1)ThreadPoolExecutor.AbortPolicy策略,是默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能执行的任务将被丢弃.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程).
向spring容器中加入ThreadPoolTaskExecutor后,使用时只需要调用其的execute方法,其参数为一个Runnable。
threadPool.execute(new Runnable() { @Override public void run() { System.out.println("======="); } });
ThreadPoolTaskExecutor有两个execute的重载,但翻看代码可以知道调用的是同一个方法,所以只调用execute就可以了
@Override public void execute(Runnable task) { Executor executor = getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } } @Override public void execute(Runnable task, long startTimeout) { execute(task); }
@Override public void execute(Runnable task) { Executor executor = getThreadPoolExecutor(); try { executor.execute(task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } } @Override public void execute(Runnable task, long startTimeout) { execute(task); }
在execute中调用的是ThreadPoolExecutor中的execute方法,执行了上面的处理流程后执行任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
二:阻塞队列BlockingQueue
在ThreadPoolTaskExecutor源码中我们看到了BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);这样一句话用来得到一个队列,这个队列是用来存放任务的。当线程池中有空闲线程时就回去任务队列中拿任务并处理。BlockingQueue是一个阻塞并线程安全的一个队列
多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干 生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数 据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并 且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。
BlockingQueue的核心方法:
放入数据:
offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
加入BlockingQueue,则返回失败。
put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
直到BlockingQueue里面有空间再继续.
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
查看 ThreadPoolTaskExecutor的代码可以发现,其主要是使用 BlockingQueue的一种实现LinkedBlockingQueue进行实现。
/** * Create the BlockingQueue to use for the ThreadPoolExecutor. * <p>A LinkedBlockingQueue instance will be created for a positive * capacity value; a SynchronousQueue else. * @param queueCapacity the specified queue capacity * @return the BlockingQueue instance * @see java.util.concurrent.LinkedBlockingQueue * @see java.util.concurrent.SynchronousQueue */ protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<Runnable>(queueCapacity); } else { return new SynchronousQueue<Runnable>(); } }
LinkedBlockingQueue是一个基于链表的阻塞队列,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立 即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从 队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理 并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以 此来提高整个队列的并发性能。
生成LinkedBlockingQueue时有一个大小限制,其默认为Integer.MAX_VALUE.
另外LinkedBlockingQueue不接受null值,当添加null的时候,会直接抛出NullPointerException:
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException();
3.另外最近的项目中使用的多线程和队列比较多,多线程自不用说,我百度了一下队列的优点,感觉说的很好,特此抄过来:
1. 解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2. 冗余
有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
3. 扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
4. 灵活性 & 峰值处理能力
在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
5. 可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
6. 送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
7.排序保证
在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
8.缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
9. 理解数据流
在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
10. 异步通信
很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
ThreadPoolTaskExecutor