1、实现下面的一个需求,控制一个执行函数只能被五个线程访问
package www.weiyuan.test; public class Test { public static void main(String[] args) { for(int i = 0 ;i < 100 ;i++){ new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub method(); } }).start(); } } public static void method(){ System.out.println(""+Thread.currentThread().getName()+"进来了"); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(""+Thread.currentThread().getName()+"出去了"); } }
上面我们创建了100个线程,随机的执行method方法
现在我们要控制每次只要5个线程可以method方法,如何实现了,可以采用信号量的方法
操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,acquire()获取一个许可,如果没有就等待,而release()释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。
Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存“无限”的节点,用Semaphore可以实现有限大小的链表。另外重入锁ReentrantLock也可以实现该功能,但实现上要负责些,代码也要复杂些。
下面的Demo中申明了一个只有5个许可的Semaphore,而有100个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。
第一种方式使用信号量的方式:
package www.weiyuan.test; import java.util.concurrent.Semaphore; public class Test { private static Semaphore semaphore = new Semaphore(5); public static void main(String[] args) { for(int i = 0 ;i < 100 ;i++){ new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub method(); } }).start(); } } public static void method(){ try { semaphore.acquire(); System.out.println(""+Thread.currentThread().getName()+"进来了"); Thread.sleep(1000); System.out.println(""+Thread.currentThread().getName()+"出去了"); semaphore.release(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
第二种方式使用线程池的方式:
package www.weiyuan.test; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Test { private static Executor executor = Executors.newFixedThreadPool(5); public static void main(String[] args) { for(int i = 0 ; i< 100 ;i++){ executor.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub method(); } }); } } public static void method(){ try { System.out.println(""+Thread.currentThread().getName()+"进来了"); Thread.sleep(1000); System.out.println(""+Thread.currentThread().getName()+"出去了"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
接下来我们自己编写代码自己封装一个线程池:
深入分析java线程池的实现原理
ThreadPoolExecutor
Executors是java线程池的工厂类,通过它可以快速初始化一个符合业务需求的线程池,如Executors.newFixedThreadPool
方法可以生成一个拥有固定线程数的线程池。
我们来分析下上面的几个参数的意思
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory)
我们来分析上面参数的意思:
线程池存在下面的两个数据结构
1、第一个数据结构是任务队列,用来存储任务的
2、第二个数据队列来存储线程的集合
线程池会创建线程执行任务,因为执行的任何很多,创建的线程如果达到了corePoolSize的数目,列如现在的corePoolSize的数目是5,但是需要执行的任务数量是100,
当创建了5个线程执行前五个任务之后,如果来了第6个任务,这个时候不会在创建新的线程了,会把多余的任务添加到任务队列中,任务队列也是有上限值的
上面的分析可以用下图来表示
如果工作队列已经达到了最大值,此时如果继续向任务队列中添加任务,即下来会做啥操作了
这个时候线程池会创建新的线程,用新的线程来执行新添加的任务,直到线程数目maximumPoolSize,如果线程的数目已经是maximumPoolSize,如果此时在添加任务,这个时候线程池就会抛出异常了
线程池中的线程数目不是越大越好,推荐的数目是电脑cpu的值加上1
如果线程池的线程数量少于corePoolSize的时候,线程池会使用threadFactory这个线程工厂创建新的线程执行Runnable任务。
如果线程池的线程数量大于corePoolSize的时候,线程池会把Runnable任务存放在队列workQueue中。
线程池的线程数量大于corePoolSize,队列workQueue已满,而且小于maximumPoolSize的时候,线程池会创建新的线程执行Runnable任务。否则,任务被拒。
keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用;
unit
keepAliveTime的单位;
workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有优先级的无界阻塞队列;
threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。
我们来看一下我们自定定义的一个一个线程池对象实现上面的代码:
package www.weiyuan.test; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) { LinkedBlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<Runnable>(100);//100是该任务队列的最大数目,达到100之后不会自动扩容了 ThreadFactory threadFactory = new ThreadFactory() { AtomicInteger atomicInteger = new AtomicInteger(); //线程安全的变量自增 @Override public Thread newThread(Runnable task) { //收到创建线程,将需要执行的任务交给线程去处理,这个地方一定注意线程安全 thread.setName("MyTHread"+i++);i++是线程不安全的 Thread thread = new Thread(task); thread.setName("MyTHread"+atomicInteger.getAndIncrement()); return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, blockingDeque, threadFactory ); for(int i = 0 ;i < 100;i++){ threadPoolExecutor.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub method(); } }); } } public static void method(){ try { System.out.println(""+Thread.currentThread().getName()+"进来了"); Thread.sleep(1000); System.out.println(""+Thread.currentThread().getName()+"出去了"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
1.Android中的资源池其实来源于Java。线程池的实现是ThreadPoolExecutor。Java中有一个Executors工厂,这个工厂类提供了许多静态方法用于创建不同类型的线程池。创建自定义的线程池一般需要指定以下的参数。
a.corePoolSize:代表的是线程池的核心线程数量,一般来说,核心线程会在线程池中一直存活,即便处于空闲状态(没有执行任务的状态)。但是也可以通过将ThreadPoolExecutor的allowCoreThreadTimeOut()设置为true,那么核心线程在空闲的时候会有超时的策略。
b.maximumPoolSize:代表线程池能够容纳的最多的线程数量。当线程数量达到这个数值后,后续的任务将会被阻塞。
c.keepAliveTime:设置非核心线程空闲时的超时时长,一旦达到这个限制,线程就会被回收。当设置ThreadPoolExecutor的allowCoreThreadTimeOut()为true时,这个超时限制也可以作用于核心线程。
d.unit:指定keepAliveTime参数的时间单位。
e.workQueue:代表线程池中的任务队列,通过execute()提交的任务都会加到这个队列中。
f.threadFactory:线程工厂,为线程池提供创建线程的功能。
LinkedBlockingDeque<Runnable>(100)
所以该线程池能够最大存储的任务数目是110,最大10个线程执行10个任务,其余100个线程存储在任务队列中
如果你一开始执行的任务大于110,线程池就会报异常
package www.weiyuan.test; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) { LinkedBlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<Runnable>(100);//100是该任务队列的最大数目,达到100之后不会自动扩容了 ThreadFactory threadFactory = new ThreadFactory() { AtomicInteger atomicInteger = new AtomicInteger(); //线程安全的变量自增 @Override public Thread newThread(Runnable task) { //收到创建线程,将需要执行的任务交给线程去处理,这个地方一定注意线程安全 thread.setName("MyTHread"+i++);i++是线程不安全的 Thread thread = new Thread(task); thread.setName("MyTHread"+atomicInteger.getAndIncrement()); return thread; } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, blockingDeque, threadFactory ); for(int i = 0 ;i < 115;i++){ threadPoolExecutor.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub method(); } }); } } public static void method(){ try { System.out.println(""+Thread.currentThread().getName()+"进来了"); Thread.sleep(1000); System.out.println(""+Thread.currentThread().getName()+"出去了"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
我们执行115个任务
就会包下面的异常:
查看这篇博客相当的经典:http://www.jianshu.com/p/87bff5cc8d8c