Executors工厂方法创建
工厂方法有四种方式:newSingleThreadExecutor(单线程池)、newFixedThreadPool(固定线程池)、newCachedThreadPool(根据需要自动创建线程池)、newSingleThreadScheduledExecutor(单任务线程池)、newScheduledThreadPool(多任务线程池)。
前三种底层使用ThreadPoolExecutor,后两种底层使用ScheduledThreadPoolExecutor。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Executors工厂方法创建
**/
public class Executor {
/**
* 计数器
*/
static int count = 0;
public static void main(String[] args) {
/**
* 创建使用单个线程的线程池
* 只有一个线程执行任务
**/
ExecutorService es1 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
es1.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
}
});
}
/**
* 创建使用固定线程数的线程池
* 线程数固定
**/
ExecutorService es2 = Executors.newFixedThreadPool(3); //这里指定固定线程大小
for (int i = 0; i < 10; i++) {
es2.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
}
});
}
/**
* 创建一个会根据需要创建新线程的线程池
* 如果没有空闲线程就新建,有空闲则用空闲线程
* 适合任务多而短的,新建的线程空闲60s后回收
* */
ExecutorService es3 = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
es3.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
}
});
}
/**
* 创建只有一个定时线程任务的线程池
**/
ScheduledExecutorService es5 = Executors.newSingleThreadScheduledExecutor();
System.out.println("时间:" + System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
/**
* command - the task to execute
* delay - the time from now to delay execution
* unit - the time unit of the delay parameter
**/
es5.schedule(new Runnable() {
@Override
public void run() {
System.out.println("时间:"+System.currentTimeMillis()+"--"+Thread.currentThread().getName() + "正在执行任务"+(count++));
}
},3, TimeUnit.SECONDS);
}
/**
* 创建拥有固定线程数量的定时线程任务的线程池
**/
ScheduledExecutorService es4 = Executors.newScheduledThreadPool(4); //固定线程数
System.out.println("时间:" + System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
/**
* command - the task to execute
* delay - the time from now to delay execution
* unit - the time unit of the delay parameter
**/
es4.schedule(new Runnable() {
@Override
public void run() {
System.out.println("时间:"+System.currentTimeMillis()+"--"+Thread.currentThread().getName() + "正在执行任务"+(count++));
}
},3, TimeUnit.SECONDS);
}
//关掉线程池,线程执行完毕后会关掉线程池
es4.shutdown();
}
}
ThreadPoolExecutor(推荐)
自定义创建线程池,即Executors的底层使用的线程池,推荐使用这种。
在开始之前先了解两样东西分别是阻塞队列和拒绝策略。
阻塞队列
BlockingQueue阻塞队列,阻塞的情况主要有如下2种:
- 当队列满了,进行入队操作阻塞
- 当队列空了,进行出队操作阻塞
阻塞队列主要用在生产者/消费者模式中,如图:
阻塞队列常用方法
阻塞线程有如下方法:
抛出异常:如果操作不能马上进行,则抛出异常。
特殊值:如果操作不能马上进行,将会返回一个特殊的值,一般是true/false。
阻塞:如果操作不能马上进行,操作会阻塞。
超时:如果操作不能马上进行,操作会阻塞指定时间,如果指定时间没执行,则返回一个特殊值,一般为true/false。
不能向BlockingQueue中插入null.否则会报NullPointerException异常。
常见阻塞队列
ArrayBlockingQueue //一个由数组结构组成的有界阻塞队列,初始化时给出队列大小构造参数,以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部. LinkedBlockingQueue //一个由链表结构组成的阻塞队列,初始化时可以指定队列大小构造参数,不给大小时采用默认值Integer.MAX_VALUE的容量 PriorityBlockingQueue //一个支持优先级排序的无界阻塞队列,允许插入null对象,插入PriorityBlockingQueue队列的对象必须实现Comparable接口 DelayQueue //一个使用优先级队列实现的无界阻塞队列 SynchronousQueue //一个不存储元素的阻塞队列,内部仅仅容纳一个元素,当一个线程插入一个元素之后就被阻塞(放入元素的线程立刻被阻塞),除非这个元素被另一个线程消费。 LinkedTransferQueue //一个由链表结构组成的无界阻塞队列(实现了继承于 BlockingQueue的 TransferQueue) LinkedBlockingDeque //一个由链表结构组成的双向阻塞队列
拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。
ThreadPoolExecutor.AbortPolicy //丢弃任务,并抛出RejectedExecutionException异常。
ThreadPoolExecutor.CallerRunsPolicy //该任务被线程池拒绝,由调用execute方法的线程执行该任务。
ThreadPoolExecutor.DiscardOldestPolicy //抛弃队列最前面的任务,然后重新尝试执行任务。
ThreadPoolExecutor.DiscardPolicy //丢弃任务,不过也不抛出异常。
线程池执行任务流程,提交 -》核心线程池 -》 等待队列 -》 最大线程池 -》 拒绝策略。
ThreadPoolExecutor
/** @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*/
new ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
参数说明
corePoolSize: 核心池的大小;预创建线程,在没有任务到来之前就创建corePoolSize个线程,默认为0。
maximumPoolSize: 线程池最大线程数;程池中最多能创建多少个线程。
keepAliveTime:空闲线程最长保持时间;只有线程池中线程大于corePoolSize时有效,
如果设置了allowCoreThreadTimeOut(true)方法,则keepAliveTime对核心线程也起作用。
unit: 参数keepAliveTime的时间单位;
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue:工作队列,用来存储等待执行的任务。生产者消费者模式用前两种。
ArrayBlockingQueue //基于数组的有界阻塞队列,按FIFO排序,新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。 LinkedBlockingQueue //基于链表的阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。 SynchronousQueue //一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。 PriorityBlockingQueue //具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
threadFactory:用于设置创建线程的工厂;通过线程工厂给创建出来的线程设置线程名、daemon、优先级等等。
handler:线程池拒绝策略;当线程池线程全部在运行,且缓存也存满时后来的任务。
AbortPolicy //丢弃任务,直接抛出RejectedExecutionException异常;默认为这种.
CallerRunsPolicy //用调用者所在线程来运行任务;如果线程池已经shutdown,则直接抛弃任务.
DiscardOldestPolicy //丢弃队列里最早的一个任务,然后把这次拒绝的任务放入队列.
DiscardPolicy //不处理,丢弃任务.
创建
第一种方式创建,使用第一个构造函数。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadPoolExecutor线程池
**/
public class ThreadPoolDemo {
//计数
static int count = 0;
public static void main(String[] args) {
//创建ThreadPoolExecutor,线程池核心线程大小,线程池最大线程数量,空闲线程存活时间,空闲线程存活时间单位,工作队列
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,5,550, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
for (int i=0;i<10;i++){
/*线程池执行Runnable实现类*/
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务"+(count++));
}
});
}
//关掉线程池
threadPoolExecutor.shutdown();
}
}
参考:
https://www.cnblogs.com/lujiango/p/7581031.html