线程池
一 , java.util.concurrent
1,首先,为什么要用线程池包?
1,用线程池包和数据库连接池一样,为了节省线程的创建和关闭时间
2,扩充了返回类型,实现runable只能通过共享数据和主线程通讯,通过callable 可以接受返回类型,并可以抛出异常在主线程捕获
3,扩充了些工具类
4,atomic支持计数
线程池最常用代码应用方式,
1,实现Callable
2. 创建线程池
3. 执行并接收future参数
4. 关闭线程池,停止接收新的线程task
代码如下
package org.benson.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * @author qq277803242 * */ public class Test4ConcurrentPool implements Callable<String> { public static int result = 0; public final static int LOOP_COUNT = 500000; @Override public String call() throws Exception { System.out.println("run.."); for (int i = 0; i < LOOP_COUNT; i++) result++; if(result>4500000) throw new Exception("my exception"); return "result is " + result; } public static void main(String[] args) throws ExecutionException { ExecutorService execu = Executors.newFixedThreadPool(10); Future<String> future = null; for (int i = 0; i < 10; i++) { future = execu.submit(new Test4ConcurrentPool()); try { System.out.println(future.get()); } catch (Exception e) { System.out.println("here can catch Exception,the exception is "+e.getMessage()); execu.shutdownNow(); } } execu.shutdown();//reduce accept new task // execu.shutdownNow();//reduce accept new task and try to stop exit task } }
看代码这段可看到是调用了线程池创建了线程
// ExecutorService execu = Executors.newFixedThreadPool(10); ExecutorService execu = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
线程池的原理,工作流程如下
1,检查基本线程是否已满,如果未满创建新的线程
2,检查线程队列是否已满,如果未满创建新的等待线程填入队列
3,检查最大线程数是否已满,如果满了执行饱和线程策略
如此再返回看线程池的参数,很明确
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
按参数顺序解释如下:
corePoolSize 在线程池中的基本大小数
maximumPoolSize 线程池所允许的最大线程数目(包括队列中的线程数和基本线程中的)
keepAliveTime 线程空闲后,线程在线程池的保存时间。 线程池的作用就是为了减少创建和销毁线程池的时间,所以当运行线程很短,但很多,间隔时间长时,此个值可以设置大点
workQueue 线程池队列,runnableTaskQueue,阻塞线程,可选如下
runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
ThreadFactory,用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
RejectedExecutionHandler,饱和处理策略,用于处理线程饱和后的机制,有如下类型
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
关于shutdownNow会循环线程池所有的线程,然后调用中断方法,中断方法抛出异常中断当前线程。
2,并发包提供的一些工具类,使用方法及其原理
java.util.concurrent.Semaphore信号灯,固定执行线程数量,每次执行5个线程,应用例子如下
package org.benson.concurrent; import java.util.concurrent.Semaphore; /** * * @author qq277803242 * */ public class Test4Semaphore implements Runnable { Semaphore semaphore=new Semaphore(5); @Override public void run() { try { semaphore.acquire(); } catch (InterruptedException e) { System.out.println(e.getMessage()); } System.out.println(Thread.currentThread().getName() + " start runing 。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(e.getMessage()); } semaphore.release(); System.out.println(Thread.currentThread().getName() + " finished"); } public static void main(String[] args) { Runnable runable=new Test4Semaphore(); for(int i=0;i<100;i++){ new Thread(runable).start(); } } }
最近把工作给辞了, 有时间了,想想还是分开篇幅写,总结下, 呵呵,到时也整理份目录出来,当作纪念
下篇打算讲讲基于Future 构建缓存
二 , java.util.concurrent.atomic
三 , java.util.concurrent.locks