线程中断我们已经直到可以使用 interrupt() 方法,但是你必须要持有 Thread 对象,但是新的并发库中似乎在避免直接对 Thread 对象的直接操作,尽量使用 Executor 来执行所有的请求。如果你在 ExecutorService 上调用 shutdownNow() .那么它将发送一个 interrupt() 调用给它启动的所有线程。如果只是调用 shutdown() ,则不会发送中断。如果只是向中断某一个任务呢。如果使用ExcecutorService那么我们通过 submit() 来启动任务而不是通过 execut() 来启动任务,这样我们可持有该任务的上下文。 submit() 将返回一个泛型的 Future<?> 对象。持有这个关键字的对象的主要目的即可以接收 call() 方法的返回值,通过 get() 方法,也可以调用其 cancel() 方法,来中断某个特定的任务。。
A Future represents the result of an asynchronous computation. Future 表示异步计算的结果
Methods are provided to check if the computation is complete,to wait for its completion, and to retrieve the result of the computation.
它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果
The result can only be retrieved using method get when the computation has completed,
blocking if necessary until it is ready. Cancellation is performed by the cancel method.
计算完成后只能使用get方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由cancel方法来执行
Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled.
还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算
If you would like to use a Future for the sake of cancellability but not provide a usable result,
you can declare types of the form Future<?> and return null as a result of the underlying task. Sample Usage (Note that the following classes are all made-up.)
如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
cancle()api
boolean cancel(boolean mayInterruptIfRunning) Attempts to cancel execution of this task. This attempt will fail if the task has already completed, has already been cancelled, or could not be cancelled for some other reason. If successful, and this task has not started when cancel is called, this task should never run. If the task has already started, then the mayInterruptIfRunning parameter determines whether the thread executing this task should be interrupted in an attempt to stop the task. After this method returns, subsequent calls to isDone will always return true. Subsequent calls to isCancelled will always return true if this method returned true. Parameters: mayInterruptIfRunning - true if the thread executing this task should be interrupted; otherwise, in-progress tasks are allowed to complete Returns: false if the task could not be cancelled, typically because it has already completed normally; true otherwise
试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。 此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。 参数: mayInterruptIfRunning - 如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成 返回: 如果无法取消任务,则返回 false,这通常是由于它已经正常完成;否则返回 true
阻塞的种类:
1、等待阻塞:运行的线程执行wait()方法,JVM会把该线程放入等待池中。
2、同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池中。
3、其他阻塞:运行的线程执行sleep()或join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。
这个例子展示了几种不同的阻塞
import java.io.IOException; import java.io.InputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * Created by huaox on 2017/4/20. * */ class SleepBlock implements Runnable{ @Override public void run() { try { TimeUnit.SECONDS.sleep(100); } catch (InterruptedException e) { System.out.println("InterruptedException by sleep()"); } System.out.println("Exiting SleepBlocked.run()"); } } class IOBlock implements Runnable{ private InputStream inputStream; public IOBlock(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { try { System.out.println("Waiting for read(): "); inputStream.read(); } catch (IOException e) { if(Thread.currentThread().isInterrupted()){ System.out.println(""); }else{ System.out.println("DON'T Interrupted from block IO"); throw new RuntimeException(e); } } System.out.println("Exiting IOBlocked.run()"); } } //同步阻塞 class SynchronizedBlock implements Runnable{ synchronized void f(){ //永远不释放锁 while (true) Thread.yield(); } public SynchronizedBlock() { new Thread(){ @Override public void run() { f(); } }.start(); } @Override public void run() { System.out.println("outer thread trying to call f()"); f(); System.out.println("Exiting SynchronizedBlock.run()"); } } public class Interrupting { private static ExecutorService executorService = Executors.newCachedThreadPool(); static void test(Runnable runnable) throws InterruptedException{ Future<?> future = executorService.submit(runnable); TimeUnit.MILLISECONDS.sleep(100); System.out.println("interrupting "+runnable.getClass().getName()); future.cancel(true); System.out.println("interrupt sent to "+runnable.getClass().getName()); } public static void main(String[] args) throws InterruptedException { test(new SleepBlock()); test(new IOBlock(System.in)); test(new SynchronizedBlock()); TimeUnit.SECONDS.sleep(3); /* System.out.println("Aborting with System.exit(0)"); System.exit(0);*/ } }
下面分别测试SleepBlock,IOBlock,SynchronizedBlock。运行结果
SleepBlock:可中断阻塞
interrupting SleepBlock InterruptedException by sleep() Exiting SleepBlocked.run() interrupt sent to SleepBlock Aborting with System.exit(0) Process finished with exit code 0
IOBlock:不可中断阻塞
Waiting for read(): interrupting IOBlock interrupt sent to IOBlock Aborting with System.exit(0) Process finished with exit code 0
SynchronizedBlock:不可中断阻塞
outer thread trying to call f() interrupting SynchronizedBlock interrupt sent to SynchronizedBlock Aborting with System.exit(0) Process finished with exit code 0
从输出中可以看到,你能够中断对sleep()的调用,(或者任何要求抛出InterrptedException的调用)。但是,你不能中断正在视图获取synvhronized锁或者视图执行I/O操作的线程。但是对于IO。可以使用一个比较笨拙的方法。即关闭任务在其发生阻塞的底层资源。
import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by huaox on 2017/4/20. * */ public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(8888); InputStream socketInput = new Socket("localhost",8888).getInputStream(); executorService.execute(new IOBlock(socketInput)); //executorService.execute(new IOBlock(System.in)); TimeUnit.MILLISECONDS.sleep(100); System.out.println("shutting down all thread"); executorService.shutdownNow(); TimeUnit.SECONDS.sleep(1); System.out.println("Closing "+ serverSocket.getClass().getName()); serverSocket.close(); /* TimeUnit.SECONDS.sleep(1); System.out.println("Closing "+ System.in.getClass().getName()); System.in.close();*/ } }
输出结果:对于serverSocket.可以关闭底层资源关闭
Waiting for read(): shutting down all thread Closing java.net.ServerSocket InterruptedException by block IO Exiting IOBlocked.run() Closing java.io.BufferedInputStream Process finished with exit code 0
程序正常终止,小红点变灰。如果对于文件IO。
import java.io.File; import java.io.FileInputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * Created by huaox on 2017/4/20. * */ public class CloseResource { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); FileInputStream fileInputStream = new FileInputStream(new File("D:\soft\CCleaner.exe")); /*ServerSocket serverSocket = new ServerSocket(8888); InputStream socketInput = new Socket("localhost",8888).getInputStream();*/ //executorService.execute(new IOBlock(socketInput)); //executorService.execute(new IOBlock(System.in)); executorService.execute(new IOBlock(fileInputStream)); TimeUnit.MILLISECONDS.sleep(100); System.out.println("shutting down all thread"); executorService.shutdownNow(); /* TimeUnit.SECONDS.sleep(1); System.out.println("Closing "+ serverSocket.getClass().getName()); serverSocket.close();*/ TimeUnit.SECONDS.sleep(1); System.out.println("Closing "+ fileInputStream.getClass().getName()); System.in.close(); } }
输出结果:
Waiting for read(): Exiting IOBlocked.run() shutting down all thread Closing java.io.FileInputStream Process finished with exit code 0
也是可以关闭的。但我们应该用NIO更好的进行控制。
/** * Created by huaox on 2017/4/20. * */ class NIOBlock implements Runnable{ private final SocketChannel socketChannel; public NIOBlock( SocketChannel socketChannel) { this.socketChannel = socketChannel; } @Override public void run() { try { System.out.println("Waiting for read(): "+this); socketChannel.read(ByteBuffer.allocate(1)); }catch (ClosedByInterruptException e) { System.out.println("ClosedByInterruptException"); }catch (AsynchronousCloseException e){ System.out.println("AsynchronousCloseException"); }catch (IOException e){ System.out.println("IOException"); } System.out.println("Exiting NIOBlocked.run() "+this); } } public class CloseResource { public static void main(String[] args) throws Exception { testNIO(); } static void testNIO() throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(8888); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8888); SocketChannel socketChannel1 = SocketChannel.open(inetSocketAddress); SocketChannel socketChannel2 = SocketChannel.open(inetSocketAddress); Future<?> future = executorService.submit(new NIOBlock(socketChannel1)); executorService.execute(new NIOBlock(socketChannel2)); executorService.shutdown(); TimeUnit.SECONDS.sleep(1); future.cancel(true); TimeUnit.SECONDS.sleep(1); socketChannel2.close(); } }
输出结果:
Waiting for read(): NIOBlock@5cb03462 Waiting for read(): NIOBlock@6b033b12 ClosedByInterruptException //对应submit()提交后调用Future的cancle(true)方法。 Exiting NIOBlocked.run() NIOBlock@5cb03462 AsynchronousCloseException 对用execute()提交后调用关闭系统底层资源的close()方法 Exiting NIOBlocked.run() NIOBlock@6b033b12 Process finished with exit code 0
对于 Channel ,如果通过 submit() 提交,那么可以使用 Future 的 cancel(true) 方法关闭,得到的异常是
ClosedByInterruptException 。如果是通过 execute() 提交,那么只能使用资源的 close() 方法进行关闭,得到的异常是 AsynchronousCloseException .
所以综上:我们最好使用 ExecutorService 的 Future<?> submit(Runnable task); 并且使用NIO.
就像前面你在不可中断I/O中或者等待同步锁中所观察到的那样,无论任何时刻,只要任务以不可中断的方式被阻塞,那么都有潜在的会锁住程序的可能,但是我们直到,在JDK5中,添加以了个特性。
即在 ReentrantLock 上阻塞的任务具备可以被中断的能力。这与 synchronized 方法或者临界区上阻塞的任务完全不同。
例子如下:
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by huaox on 2017/4/20. * */ class BlockedMutex{ private Lock lock = new ReentrantLock(); BlockedMutex() { lock.lock(); } void f(){ try { lock.lockInterruptibly();//这个方法具有可被中断的能力 System.out.println("lock acquired in f()"); } catch (InterruptedException e) { System.out.println("interrupted from lock acquisition in f()"); } } } class Blocked2 implements Runnable{ BlockedMutex mutex = new BlockedMutex(); @Override public void run() { System.out.println("waiting for f() in BlockedMutex"); mutex.f(); System.out.println("broken out of mutex call"); } } public class Interrupting2 { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new Blocked2()); thread.start(); TimeUnit.SECONDS.sleep(1); System.out.println("call thread.interrupt"); thread.interrupt(); } }
输出结果:
waiting for f() in BlockedMutex call thread.interrupt interrupted from lock acquisition in f() broken out of mutex call Process finished with exit code 0
可以看到使用 Lock 的 public void lockInterruptibly() throws InterruptedException 时其具有可被中断的能力
所以我们以后最好不要使用 synchronized 而使用Lock