有时,我们需要在一个程序中同时并行的处理多个任务,如播放器一边要播放音乐同时还要不断更新画面显示,或者是一边执行耗时任务,UI还能一边继续响应各种事件。还有的时候,一个任务需要很长时间才能完成,如果分成多份一起执行,可以极大的缩短需要的时间。多线程可以很好的解决这类问题。
一个程序(进程)如果可以同时执行多个任务,每个并行的任务都是通过一个线程来完成,这就是一个多线程程序。进程拥有自己的一整套数据(变量),各个线程共享进程的数据,线程间通信比进程间通信更简单,线程开销比进程小。
Java中为多线程任务提供了很多的类。包括最基础的Thread类、Runnable等接口,用于线程同步的锁、阻塞队列、同步器,使用线程池的执行器、执行框架,还有可以在多线程中使用的线程安全集合等。
一、多线程基础
Runnable接口和Thread类是实现多线程最基础的方式。
1.创建线程
方式一:把要在新线程中执行的代码封装在Runnable对象的run()方法中,将Runable对象做为参数构造Thread对象,调用Thread对象的start()方法,并行任务就会在一个新的线程中开始执行,而当前线性继续执行后面的代码直至结束。
方式二:从Thead派生子类,重写Thread类的run()方法,在其中放置在要新线程中执行的代码,然后创建Thread子类对象并调用start()方法。这种方式可以省去Runnable包装类,但是却要创建一个Thread的子类。
推荐使用方法一,因为方法二把线程对象和具体任务一一绑定了,要执行1000个任务就得创建等量的线程,代价太大。应该把具体任务和用于执行任务的线程对象分离开,任务就封装在Runnbale对象中,这样就可使用线程池,用固定数量的线程来执行数量巨大的任务。
必须调用Thread类的start()方法才能开启新线程。直接调用Thread或Runnable对象的run()方法都只是在当前线程中执行任务。
Thread
- Thread(Runnable target) 用Runnable对象构造Thread,Runnable对象会被保存在Thread的target域中
- void run() 默认如果target域不为空,则执行target.run(),否则什么也不做。 可以继承Thread类,重写此方法,在其中放入要在新线程中执行的代码,不再需要Runanble对象封装新线程的任务代码。
- synchronized void start() 创建一个新的线程并开始执行,JVM会在新线程中调用Thread对象的run()方法。一个Thread对象只能调用一次这个方法,并且会立即返回,调用完后两个线程就已经开始并行执行了。
- static Thread currentThread() 返回正在执行这条命令的线程(当前线程)的Thread对象
- void interrupt() 向线程发送中断请求,线程的中断状态被设置为true。特别说明见后文。
- boolean isInterrupted() 测试线程中断状态是否被设置。不会改变中断状态。
- static boolean isInterrupted() 也是测试正在执行这条命令的线程(当前线程)是否被设置中断。调用后会将中断状态设置为false。
- State getState() 获取线程当前的状态。
- State.NEW 新创建状态。刚用new构造了一个新的Thread对象,还未调用start()方法
- State.RUNNABLE 可运行状态。一旦调用start()方法,线程就进入了可运行状态。实际上线程有可能在运行,也有可能没有在运行。线程调度器会根据线程的状态,如线程阻塞或等待的条件是否满足和优先级,自动授予或剥夺某些线程的运行权。
- State.BLOCKED 阻塞状态(不活动,低消耗,等待调度器重新激活它)。获取内部对象锁失败的情况下进入(不是java.util.concurrent锁),其他线程释放了锁并且调度器运行本线程持有该锁时恢复到可运行状态。
- State.WAITING 等待状态(不活动,低消耗)。本线程需要等待其他线程通知调度器一个条件时,自己进入等待状态(如Object.wait方法,Thread.join方法,或等待java.util.concurrent库的Lock或Condition时)。等待着的通知时出现时,重新恢复到可运行状态。
- State.TIMED_WAITING 计时等待状态(不活动,低消耗)。进入条件跟等待状态一样,但是有一个等待超时时间,超时期到了或者等待的通知出现了都会恢复到可运行状态。带有超时参数的方法有Thread.sleep(long),Object.wait(long), Thread.join(long), Lock.tryLock(long), Condition.await(long),Thread.sleep(long,int),Object.wait(long,int), Thread.join(long,int), Lock.tryLock(long,int), Condition.await(long,int) 。
- State.TERMINATED 终止状态。线程已经正常退出或异常终止。
- static void yield() 使当前线程处于让步状态,如果另一个线程与当前线程优先级相同,调度器会选另一个线程运行。一般用于调试。
- void join()/void join(long ms)/void join(long ms, int ns) 等待/计时等待指定线程终止
- void stop() 抛出一个ThreadDeath错误对象,由此杀死线程。已过时(可能停在不安全的状态,破坏对象数据:例如转账操作,从一个账号减了钱,还没加到另一个账号,线程被强制结束了)。
- void suspend()/resume() 暂停/恢复线程的执行。已过时(容易形成死锁:例如当前线程挂起了另一个持有锁的线程,当前线程又要获取同一个锁,当前线程就会被这个锁阻塞,另一个线程会要等待从挂起恢复也一直阻塞)。
- void int getPriority() 获取线程的优先级
-
- void setPriority(int priority) 设置线程优先级。线程优先级从最低1-最高10,默认继承父线程的优先级。优先级的实现依赖于系统,Java线程的优先级会被映射到操作系统的优先级上。例如Windws支持7个优先级,Linux版Java虚拟机只有相同优先级。在程序中设置优先级是容易出现错误的(高优先级的线程没有进入非活动状态,低优先级线程永远不会执行),因此最好不要使用,不要让程序功能的正确性依赖优先级。
- Thread.MIN_PRIORITY 最低优先级,1
- Thread.NORM_PRIORITY 5
- Thread.MAX_PRIORITY 最高优先级,10
- void setDaemon(boolean isDaemon) 修改线程为守护线程/用户线程
- static UncaughtExceptionHandler getDefaultUncaughtExceptionHandler() 获取所有线程默认的未捕获异常处理器
- static void setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler eh) 设置所有线程默认的未捕获异常处理器,后文详述。
- UncaughtExceptionHandler getUncaughtExceptionHandler() 获取当前线程的未捕获异常处理器
- void setUncaughtExceptionHandler(UncaughtExceptionHandler eh) 设置当前线程的未捕获异常处理器,后文详述。
- void setPriority(int priority) 设置线程优先级。线程优先级从最低1-最高10,默认继承父线程的优先级。优先级的实现依赖于系统,Java线程的优先级会被映射到操作系统的优先级上。例如Windws支持7个优先级,Linux版Java虚拟机只有相同优先级。在程序中设置优先级是容易出现错误的(高优先级的线程没有进入非活动状态,低优先级线程永远不会执行),因此最好不要使用,不要让程序功能的正确性依赖优先级。
Runnable 接口 包装在新线程中执行的内容
-
- void run() 新的线程创建后会执行该方法里的代码
Java中线程可以分为两类:
- User线程(用户线程) 默认创建的线程都是用户线程。
- Daemon线程(守护线程) 守护线程唯一的用途是给其他的工作线程提供服务。当只剩下守护线程时,程序没必要再运行,虚拟机就退出了。使用守护线程也要特别注意,因为它可能在任何时刻甚至是一个操作的中间就终止了,不要在其中访问固定资源,如文件,数据库。
其他说明:
- 线程开始执行前调用setDaemon方法可以设置线程是用户线程还是守护线程,线程一旦开始执行就无法再更改了。
- JAVA虚拟机启动时创建主线程执行main方法(程序入口点)。
- 在main方法所在的主线程结束后,虚拟机会自动启动一个DestroyJavaVM线程,该线程会等待所有user thread结束后(即当只剩下daemon 线程和DestroyJavaVM线程自己时),终止所有daemon线程,整个虚拟机就退出。因此,只要有一个用户线程还没结束,虚拟机就不会退出。
最基本的多线程示例代码
class MyThread extends Thread { private int begin; public void setCountStart(int begin) { this.begin=begin; } @Override public void run() { try { int end=begin+100; for(int i=begin; i<end; i++){ System.out.println("sub: "+i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } } MyThread t=new MyThread(); t.setCountStart(200); t.start(); for(int i=0; i<100; i++){ System.out.println("main: "+i); Thread.sleep(1000); }
class MyTask implements Runnable { private int begin=0; public MyTask(int s){begin=s;} @Override public void run() { try { int end=begin+100; for(int i=begin; i<end; i++){ System.out.println("sub: "+i); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }; Thread t=new Thread(new MyTask(123)); t.start(); for(int i=0; i<100; i++){ System.out.println("main: "+i); Thread.sleep(1000); if(i==10) t.stop(); }
2.终止线程
Thread或Runnable对象的run()方法包装了新线程中执行的代码,在run()方法中遇到下面的情况,线程会终止。
- 正常终止。执行完最后一条语句,也包括遇到return返回
- 异常终止。出现未捕获的异常
强制结束
强制结束线程,实际上也是通过抛出特殊的异常对象,导致线程异常终止。主要有两种方式:
- 调用Thread对象的stop()方法。抛出一个ThreadDeath异常,停止线程执行(这个异常如果被捕获一定要重新抛出)。这个方法已经不推荐使用,原因是线程可能停止在一个不安全的状态(例如转账操作,从一个账号减了钱,还没加到另一个账号,线程被强制结束了),应该使用请求中断的方式。
- 请求中断方式。要结束一个线程,就设置该线程的中断变量(调用Thread对象的interrupt()方法)(表明着有人想要中断这个线程),线程中的代码自己要负责查询中断变量(Thread类静态方法interrupted()或Thread对象的isInterrupted()方法),如果发现中断变量被设置了就自觉点不要再执行了,恢复到安全的状态后自行退出。请求中断不是强制的,如果线程中的代码不查询中断变量,或者发现中断变量已经被设置了但是不理会继续厚着脸皮执行,这个线程还是会一直运行不会被停止。
void interrupt()方法和InterruptedException特别说明
- 如果调用该方法时,该线程正被某些可中断的方法阻塞着(sleep,wait或可中断IO调用等),那么现在肯定是无法检测中断状态的,那么清理中断状态,抛出InterruptedException异常,阻塞的方法调用会立即被这个异常中断。
- 如果调用该方法将中断状态设置为了true,不久就调用了一个可中断的方法(sleep,wait,可中断IO调用等),这个调用不会成功,并且同样会清除中断状态,抛出InterruptedException异常。可见,如果会循环调用sleep()这类可中断的方法,就不需要再手动检测中断状态了。
(注意也存在不能被中断的阻塞IO调用,最好不要使用不可中断的方法)。
可见如果不设置中断,InterruptedException肯定不会出现,而只要抛出InterruptedException,设置的中断状态肯定已经被清理了,这种情况只有InterruptedException这个异常是我们知道有中断请求的唯一标识了,一定要向外层通知有中断发生,千万不要再把这个异常压制住,否则怎么调用interrupt()方法请求中断都不会有什么卵用,线程中外层的代码压根不知道有中断这回事,照常运行。将这个中断请求通知给外层有两种方式:
- catch到InterruptedException时,调用Thread.currentThread().interrupt(),重新把中断状态设置上,让外层可以检测到。
- 最好的方法是,不要再catch InterruptedException异常啦,只要有这个异常就往外层抛吧。一直抛到最外层,在Thread对象或Runnable对象的run()方法中处理这个异常(处理操作:恢复到安全的状态然后自觉退出)。
直接调用stop()方法见前一段实例代码。采用中断方式实例如下:
class MyInterruptableCheckTask implements Runnable { private int begin=0; public MyInterruptableCheckTask(int s){begin=s;} @Override public void run() { int end = begin + 3000000; for (int i = begin; i < end && !Thread.currentThread().isInterrupted(); i++) { System.out.println("sub: " + i); } if (Thread.currentThread().isInterrupted()) System.out.println("sub thread is interrupted"); else System.out.println("sub natural stop"); } }; Thread t=new Thread(new MyInterruptableCheckTask(111)); t.start(); for(int i=0; i<10; i++){ System.out.println("main: "+i); Thread.sleep(1000); if(i==5) t.interrupt(); }
class MyInterruptableExceptionTask implements Runnable { private int begin=0; public MyInterruptableExceptionTask(int s){begin=s;} @Override public void run() { try { int end=begin+10; for(int i=begin; i<end; i++){ System.out.println("sub: "+i); Thread.sleep(1000); //如果设置中断时正在sleep,或设置完中断后一个循环里遇到sleep,都会抛出InterruptedException异常,不需要再手动检测中断状态了 } } catch (InterruptedException e) { System.out.println("the call Thread.sleep(n) is interrupted by InterruptedExcetpion"); Thread.currentThread().interrupt(); //产生InterruptedException异常时中断状态被清除,所以要重新设置中断或将中断异常向外抛出供后续代码检测是否发生了中断 } if(Thread.currentThread().isInterrupted()) System.out.println("sub thread is interrupted"); else System.out.println("sub natural stop"); } }; Thread t=new Thread(new MyInterruptableExceptionTask(111)); t.start(); for(int i=0; i<10; i++){ System.out.println("main: "+i); Thread.sleep(1000); if(i==5) t.interrupt(); }
3.未捕获异常的处理
Thread对象和Runnable对象的run()中,任何已检测异常都无法抛出,必须要在run()方法内部捕获和处理。而那些没有被捕获到的未检测异常和错误都会直接导致线程终止。在线程终止前,这些未捕获的异常会被传递到未捕获异常处理器对象。线程因为发生未捕获的异常而意外终止时,未捕获异常处理器提供了一个进行善后处理和日志记录机会。
UncaughtExceptionHandler接口
-
- void uncaughtException(Thread t, Throwable e)
可以在两个地方设置未捕获异常处理器:
- 设置全局默认的未捕获异常处理器,调用Thread类static的setsetDefaultUncaughtExceptionHandler方法。如果不安装默认的处理器,默认处理器为空。
- 设置单个Thread线程对象的未捕获异常处理器,调用Thread对象的setsetDefaultUncaughtExceptionHandler方法。如果不为单个线程对象安装处理器,用到的处理器就是该线程的ThreadGroup(线程组)对象。
ThreadGroup(线程组)是一个可以统一管理的线程集合。默认情况下,创建的所有线程属于相同的线程组,当然也可以建立其他线程组。现在已经有了更好的方式来操作线程集合,建议不要再使用线程组了。
ThreadGroup类实现了UncaughtExceptionHandler接口,操作过程为:
- 如果ThreadGroup还有父线程组,就调用其父线程组的uncaughtException方法。
- 否则,如果默认处理器非空,就调用默认的处理器。
- 否则,如果Throwable参数是一个ThreadDeath对象( stop()方法抛出的 ),就什么也不做。
- 否则,输出线程名字和栈踪迹到System.err上。
用前面的这些内容就可以以底层的方式实现常用的多线程功能了,后面的内容都是以别的方式(更高层次的封装)实现多线程。多线程程序都要面对一个非常重要的问题,那就是线程的同步,现在最好先去看看与线程同步相关的内容(http://www.cnblogs.com/pixy/p/4796638.html)。
二、Callable与Future
前面已经说到Runnable接口是用来封装一个异步运行的任务的。但是Runnable 定义的run()方法返回类型为void,用它封装的任务无法直接返回结果(可以将结果存放在Runnable属性中,线程执行结束后读取属性得到结果),并且无法抛出已检查的异常。Java提供的另一个接口Callable,功能基本和Runnable一致但是任务执行完了可以返回一个结果并且可以抛出异常。
Callable<V> 接口
-
- V call() throw Exception
尽管用Callable定义了可以返回计算结果的任务,但是Thread类只能接受Runnable对象,要在新线程中执行这个Callable任务还需要用到FutureTask包装器类(FutureTask实现了Runnable接口)。FutureTask还实现了Future接口,Future接口定义任务执行过程控制和查询的方法。因此,FutureTask对象也可以查询任务是否已经执行结束,如果执行结束了可以提取执行结果,还可以取消任务执行(中断的方式)。可以认为FutureTask同时包装了异步任务及其执行过程。
Future<V> 接口
-
- V get() throws Exception 获取任务执行结果,如果还没执行完则一直阻塞等待
- V get(long timeout, TimeUnit unit) throws Exception 获取任务执行结果,如果还没执行完则一直阻塞等待指定时间,超时后抛出TimeoutException异常
- boolean cancel(boolean mayInterrupt) 取消任务执行,如果任务已经开始并且mayInterrupt为true,则任务被中断。取消成功返回true
- boolean isCancelled() 任务是否在完成前被取消了
- boolean isDone() 任务是否结束了,不论是正常结束、异常中断还是被取消。
FutureTask 即是Runnable对象也是Future对象
-
- FutureTask(Callable<V> task) 构造方法,包装Callable对象
- FutureTask(Runnable task, V result) 构造方法,包装Runnable对象,如果执行完成则把result作为执行结果。
可通过Executors类的static <T> Callable<T> callable(Runnable task, T result)方法将Runnable对象和一个预先准备的结果构造成Callable对象(RunnableAdapter子类)。
示例:
@Test public void test() throws InterruptedException, ExecutionException { class TestJob implements Callable<Integer> { @Override public Integer call() throws Exception { Random rand=new Random(); int time=1+rand.nextInt(5); Thread.sleep(time*1000); return time; } }; int count=100; @SuppressWarnings("unchecked") FutureTask<Integer>[] tasks=new FutureTask[count]; for(int i=0; i<count; i++) { tasks[i]=new FutureTask<Integer>(new TestJob()); new Thread(tasks[i]).start(); } int sum=0; for(Future<Integer> future:tasks) { //future.isDone(); //future.cancel(true); System.out.pritln(future.get());
sum+=future.get(); } System.out.println(sum); }
三、线程池和执行器
构建线程会与操作系统线进行交互,是有一定代价的,如果有大量的生命周期很短的异步任务,每个任务创建一个线程的话开销会很大,这种情况应该使用线程池。线程池管理一定数量可以反复利用的空闲线程,把一个任务提交到线程池后,线程池会自动使用一个空闲的线程去执行该任务,执行结束后线程不会死亡,而是继续等待为下一个任务提供执行服务。
提交一个任务到线程池后,如果所有线程都被占用了,则该任务会排队等待直到有线程空闲出来。线程池可以把并发线程控制在一定的规模内,避免并发线程数太大导致系统性能降低甚至是虚拟机崩溃。
Executors类是一个用于创建线程池执行服务的工厂类,还提供了很多辅助的方法,如Runnable转化为Callable。Executors类创建的一些线程池执行服务对象实际类型为ThreadPoolExecutor,可以计划的线程池服务实际类型为ScheduledThreadPoolExecutor。
Executors
-
- static ExecutorService newFixedThreadPool(int nThreads) Fixed线程池包含固定数量的线程,空闲线程一直被保留。提交任务时无空闲线程则排队等待。
- static ExecutorService newCachedThreadPool() 没有空闲线程时创建新的线程;空闲线程会被保留60秒
- static ExecutorService newSingleThreadExecutor() 只有一个线程的线程池,按顺序执行每个提交
- static ExecutorService newScheduledThreadPool(int corePoolSize) 可以计划/周期执行的固定数量线程池(可替代Timer)
- static ExecutorService newSingleThreadScheduledExecutor() 可以计划/周期执行的单线程池
- static <T> Callable<T> callable(Runnable task, T result) Runnable对象转化为Callable对象,task执行结束后可以返回result作为结果
ScheduledThreadPoolExecutor ---> ThreadPoolExecutor --> AbstractExecutorService --> ExecutorService --> Executor
|--> ScheduledExecutorService --------------------------------------↑
ThreadPoolExecutor
-
- int getLargestPoolSize()
- ......
Executor 接口
-
- void execute(Runnable)
ExecutorService 接口
-
- Future<?> submit(Runnable) 提交Runnbale任务到线程池执行,返回Future对象可以查询任务状态。因为无返回结果所有get()方法null
- <T> Future<T> submit(Runnable, T result) 提交Runnbale任务到线程池执行,返回Future对象可以查询任务状态。执行完成时get方法返回result。
- <T> Future<T> submit(Callable<T>) 提交Callabe任务到线程池执行。
- void shutdown() 关闭线程池,不再接收新任务,等待所有任务执行完成后终止线程。
- List<Runnable> shutdownNow() 关闭线程池,不再接收新任务,取消还未执行的任务,尝试中断正在运行的任务。返回等待执行的任务。
- boolean isShutdown() 线程池是否已关闭
- boolean isTerminated() 线程池关闭并且所有任务是否都结束
- boolean awaitTermination(long timeout, TimeUnit unit) 线程池关闭后的等待所有线程结束
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>) 批量提交任务
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>, long timeout, TimeUnit unit) 批量提交任务,等待超时时抛出TimeoutException
- <T> T invokeAny(Collection<? extends Callable<T>>) 批量提交任务,返回任意一个已经完成了的结果(适用于同时处理多个方案,有一个可行即可的情况)
- <T> T invokeAny(Collection<? extends Callable<T>>, long timeout, TimeUnit unit) 同上,等待超时抛出TimeoutException
ScheduledExecutorService 接口
-
- ScheduledFuture<?> schedule(Runnable, long delay, TimeUnit unit)
- <V> ScheduledFuture<V> schedule(Callable<V>, long delay, TimeUnit unit)
- ScheduledFuture<?> scheduleAtFixedRate(Runnable, long initDelay, long period, TimeUnit) 每initDelay+N*period时间执行一次(N=0,1,2.....),如果到了下一次执行开始的时间而前一次执行还没完成,则开始时间延后,等待前一次执行结束避免并行。
- ScheduledFuture<?> scheduleWithFixedDelay(Runnalbe, long initDelay, long delay, TimeUnit) 前一次执行结束后,等待delay的时间再次执行
ScheduledFuture接口 --> Future
--> Delayed extends Comparble<Delayed> 定义了long getDelayed(TimeUnit unit)方法
结果管理: ExecutorCompletionService类包装一个线程池执行服务后可以收集和管理线程池中任务的Future对象。可以按任务完成的先后顺序提取结果。
ExecutorCompletionService<V> --> CompletionService
-
- ExecutorCompletionService(Executor executor) 构建一个完成服务,收集执行器的结果
- Future<V> submit(Callable<V> task) 提交任务给底层的执行器
- Future<V> submit(Runnable task, V result) 同上
- Future<V> take() 移除一个已完成的任务,如果没有完成的则阻塞
- Future<V> poll() 移除一个已完成的任务,如果没有返回null
- Future<V> poll(long timeout, TimeUnit unit) 同上,但会等待指定时间
示例:
class TestJob implements Callable<Integer> { @Override public Integer call() throws Exception { Random rand=new Random(); int time=1+rand.nextInt(5); Thread.sleep(time*1000); return time; } }; @Test public void testThreadPoolExecutor() throws InterruptedException, ExecutionException { int count=100; List<Future<Integer>> futures=new ArrayList<Future<Integer>>(); ExecutorService es=Executors.newFixedThreadPool(100); for(int i=0; i<count; i++) futures.add(es.submit(new TestJob())); int sum=0; for(int i=0; i<count; i++) { Future<Integer> future=futures.get(i); System.out.println(future.get()); sum+=future.get(); } System.out.println(sum); } @Test public void testCompletionService() throws InterruptedException, ExecutionException { int count=100; ExecutorService es=Executors.newFixedThreadPool(100); ExecutorCompletionService<Integer> cs=new ExecutorCompletionService<Integer>(es); for(int i=0; i<count; i++) cs.submit(new TestJob()); int sum=0; for(int i=0; i<count; i++) { Future<Integer> future=cs.take(); System.out.println(future.get()); sum+=future.get(); } System.out.println(sum); }
四、Fork-Join框架
如果一个大的计算过程可以拆分(Fork)成很多个独立的子计算过程,大的计算过程的结果就是子计算过程结果的汇总(Join)。就可直接使用Java7中提供的Fork-Join计算框架,主要包括ForkJoinPool线程池执行服务类和包装任务的ForkJoinTask<V>接口,RecursiveTask类(可以生成结果)和RecursiveAction类(不生成结果)都实现了ForkJoinTask接口,可以作为包装任务的基类。ForkJoin可以通过工作密取的方式动态平衡各个线程的工作负载。每个工作线程都有一个双端队列来完成任务,一个工作将子任务压入其双端队列的队头(队尾的任务一定不会比队头的任务小)(队头只有该线程自己能访问,不需要加锁),另一个工作线程空闲时会从别的工作线程的双端队列队尾偷一个任务。
ForkJoin的实际线程数量不能直接确定,适用于计算密集性的任务。
ForkJoinPool --> AbstractExecutorService --> ExecutorService --> Executor
-
- <V> T invoke(ForkJoinTask<V>) 执行一个ForkJoin任务并等待任务结果
- <V> ForkJoinTask<V> submit(ForkJoinTask<V> task) 提交一个ForkJoin任务
RecursiveTask<V> --> ForkJoinTask<V> --> Future<V> , Serializable
RecursiveAction --> ForkJoinTask<Void>
ForkJoinTask<V>
-
- <V> abstract protected T compute()
- void invokeAll(ForkJoinTask<?> ...) 接收很多任务并阻塞,直到这些任务都完成
- V join() 如果任务结束(isDone()返回true),返回结果,与get()方法不同的是不会抛出已检查异常
示例(背包问题算法):
class Result { public int maxValue; public String selectedIdx=""; public Result(int result, String selectIdx) { this.maxValue=result; this.selectedIdx=selectIdx; } } @Test public void testForkJoinCached() { int count=100; int bagSize=20; System.out.println("item count: "+count); System.out.println("bag szie: "+bagSize); System.out.print("Index: "); for(int i=0; i<count; i++) System.out.print(i+" "); Random rand=new Random(); final int[] itemWeight=new int[count]; System.out.print(" Weight: "); for(int i=0; i<count; i++) { itemWeight[i]=2+rand.nextInt(18); System.out.print(itemWeight[i]+" "); } System.out.print(" Value: "); final int[] itemValue=new int[count]; for(int i=0; i<count; i++) { itemValue[i]=20+rand.nextInt(80); System.out.print(itemValue[i]+" "); } System.out.println(""); final ConcurrentHashMap<String,Result> cache=new ConcurrentHashMap<>(); class MyTask extends RecursiveTask<Result> { private static final long serialVersionUID = 6264295083196664401L; private int chooseIdx; private int bagSpace; public MyTask(int chooseIdx, int bagSpace) { this.chooseIdx=chooseIdx; this.bagSpace=bagSpace; } @Override protected Result compute() { if(bagSpace<=0 || chooseIdx<0) return new Result(0,""); String tag=chooseIdx+"_"+bagSpace; Result r=cache.get(tag); if(r!=null) return r; Result vsel=new Result(0,""); Result vunsel=new Result(0,""); if(itemWeight[chooseIdx]<=bagSpace) { MyTask choose=new MyTask(chooseIdx-1, bagSpace-itemWeight[chooseIdx]); invokeAll(choose); vsel=choose.join(); vsel=new Result(vsel.maxValue+itemValue[chooseIdx],vsel.selectedIdx+chooseIdx+" "); } MyTask notChoose=new MyTask(chooseIdx-1, bagSpace); invokeAll(notChoose); vunsel=notChoose.join(); Result ret=vsel.maxValue>vunsel.maxValue?vsel:vunsel; cache.putIfAbsent(tag, ret); return ret; } }; ForkJoinPool pool=new ForkJoinPool(); Result maxValue=pool.invoke(new MyTask(count-1,bagSize));
System.out.println("Maxvalue of Size "+bagSize+": "+ maxValue.maxValue); System.out.println("Selected Index of "+bagSize+": "+ maxValue.selectedIdx); }
五、线程和Swing
Swing类库不是线程安全的(同步会增加响应时间开销,加了同步照样容易死锁)。多个线程同时操纵UI元素,可能会崩溃。
1.Swing与多线程
Swing和多线程同时使用时,用两个原则:
- 单一线程规则:除了时间分配线程,不要再任何线程中接触Swing组件。
- 需要花费很长时间的任务放在独立的线程中执行,而不要放在事件分配线程中。
在Swing程序中,主线程把UI构造的代码提交到事件分配线程后就退出了。在任何线程中都可以调用EventQueue类的invokeLater方法和invokeAndWait方法把一些与UI相关的操作放到事件分配线程中执行,如更新进度,更新状态信息等。
EventQueue
-
- static void invokeLater(Runnable) 把任务提交到事件分配线程后立即返回
- static void invokeAndWait(Runnable) 等待直到任务确实执行完成了才返回
- static boolean isDispatchThread() 当前线程是否是事件分配线程
2.SwingWorker工作线程
SwingWorker是实现后台工作线程的便捷类。
SwingWorker<T,V>
-
- abstract T doInBackground() 后台线程中执行任务,并返回最终结果
- protected void process(LIst<V> data) 中间进度数据的UI处理。在事件分配线程中执行
- protected void done() 任务执行完成时的UI处理,在事件分配线程中执行
- protected void publis(V...data) 传递中间数据到事件分配线程,只能在doInbackground方法中调用
- void execute() 开始执行
- void cancel(boolean mayInterrupt) 取消执行
- boolean isCancelled()
- boolean isDone()
- SwingWorker.StateValue getState() 返回工作器线程状态。PENDING, STARTED, DONE
- protected setProgress(int)
- int getProgress()
示例:
public class StringWorkerDemo extends JFrame {private JLabel statusLine; private DemoWorker worker; public StringWorkerDemo() throws InterruptedException, ExecutionException { statusLine=new JLabel("starting...."); this.add(statusLine); this.pack(); worker=new DemoWorker(); worker.execute(); Thread t=new Thread(new Runnable(){ @Override public void run() { try { Thread.sleep(3000); worker.cancel(true); } catch (InterruptedException e) { } }}); //t.start(); } class DemoWorker extends SwingWorker<Integer,String> { @Override protected Integer doInBackground() throws Exception { Random random=new Random(); int itemCount=60+random.nextInt(40); for(int i=0; i<itemCount; i++) { Thread.sleep(100); //do something this.publish("processing "+(int)(100.0*(i+1)/itemCount)+"%"); } return itemCount; } @Override protected void process(List<String> chunks) { if(!this.isCancelled()) for(String str:chunks) statusLine.setText(str); } @Override protected void done() { if(this.isCancelled()) statusLine.setText("process canceled"); else if(this.isDone()) statusLine.setText("process done"); } } public static void main(String[] args) { EventQueue.invokeLater(new Runnable(){ @Override public void run() { JFrame frame; try { frame = new StringWorkerDemo(); frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); frame.setSize(300, 200); frame.setVisible(true); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }}); } }
3.关于单一线程规则
单一线程规则也有例外:任何线程里都可以添加删除UI元素的事件监听器,但监听方法都在事件分配线程中执行。有一些Swing对象的方法也是线程安全的,如
- JTextComponent.setText
- JTextArea.insert
- JTextArea.apped
- JTextArea.replaceRange
- JCompent.repaint
- JCompent.revalidate
最开始的时候,单一线程规则允许直接在main方法中执行创建UI的操作,在UI顶层框架中调用setVisible(true),但是这样并不安全。