并发基础知识
一、线程的基本概念
线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。
1.创建线程
1)继承Thread
Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:
public class MyThread extends Thread{ @Override public void run() { System.out.println("thread name: " + Thread.currentThread().getName() + " thread id: " + Thread.currentThread().getId()); System.out.println("Running my thread!"); } public static void main(String[] args) { MyThread thread = new MyThread(); //启动线程 thread.start(); /*thread name: Thread-0 thread id: 11 Running my thread!*/ } }
2)实现Runnable接口
public class MyRunnable implements Runnable{ @Override public void run() { System.out.println("thread name:" + Thread.currentThread().getName()); } public static void main(String[] args) { System.out.println("Main thread name : " + Thread.currentThread().getName()); //Main thread name : main Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0 thread.start(); } }
2.线程的基本属性和方法
1)id和name
id: 一个递增整数,每创建一个线程就加一
name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。
2)优先级
优先级从1到10,默认为5,相关方法:
public final void setPriority(int newPriority) public final int getPriority()
在编程中不要过分依赖优先级。
3)状态
线程有一个状态概念,Thread获取状态方法:
public State getState()
返回值类型为Thread.State,它是一个枚举值:
public enum State { NEW, //线程还没调用start //调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE, //不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在 //等待操作系统分配时间片,只是它在等待其他条件。 RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; //线程已经结束运行 }
Thread还有一个方法,返回线程是否alive:
//线程被启动后,run方法运行结束前,返回值都是true public final native boolean isAlive()
4.是否是daemon线程
一般情况下整个程序只有在所有的线程都结束后,线程才会退出。
而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。
public final void setDaemon(boolean on) public final boolean isDaemon()
5.sleep方法
Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。
public static native void sleep(long millis) throws InterruptedException;
在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。
睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。
6.yield方法
public static native void yield();
调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。
不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。
7.join方法
在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。
Thread有一个join方法,可以让调用join的线程等待该线程的结束。
public final void join() throws InterruptedException
//限定等待的最长时间 public final synchronized void join(long millis) throws InterruptedException
MyThread thread = new MyThread(); thread.start(); //让main线程在子线程调用结束后再退出,相当于阻塞了main线程 thread.join();
3.共享内存及可能出现的问题
虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,
但线程之间可以共享内存,它们可以访问和操作相同的对象。
public class ShareMemoryDemo { private static int shared = 0; private static void incrShared() { shared ++; } static class ChildThread extends Thread { List<String> list; public ChildThread(List<String> list) { this.list = list; } @Override public void run() { incrShared(); list.add(Thread.currentThread().getName()); } } public static void main(String[] args) throws InterruptedException { ArrayList<String> list = new ArrayList<>(); ChildThread t1 = new ChildThread(list); ChildThread t2 = new ChildThread(list); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(shared); System.out.println(list); /*2 [Thread-0, Thread-1]*/ } }
ChildThread的run方法访问了共享变量shared和list。
执行流、内存和程序代码之间的关系:
1)不同的执行流可以访问和操作相同的变量
2)不同的执行流可以执行相同的程序代码
所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的
3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中
的参数和局部变量都有自己的一份。
当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。
1.竞态条件
所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序
有关,可能正确也可能不正确。
public class CounterThread extends Thread{ private static int counter = 0; @Override public void run() { for (int i = 0; i < 1000; i++) { counter++; } } public static void main(String[] args) throws InterruptedException { int num = 1000; Thread[] threads = new Thread[num]; for (int i = 0; i< num; i++) { threads[i] = new CounterThread(); threads[i].start(); } for (int i = 0; i < 1000; i++) { threads[i].join(); } System.out.println(counter); //998400 } }
期望结果应该是10000000,实际结果为998400,为什么呢?
因为counter++这个操作不是原子操作,它分为了3个步骤:
1)去counter的当前值
2)在当前值上加1
3)将新值重新赋值给counter
两个线程可能同时执行第一步,取到了相同的counter值。
2.内存的可见性
多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,
另一个线程不一定能马上见到,甚至永远见不到。
public class VisibilityDemo { private static boolean shutdown = false; static class MyThread extends Thread { @Override public void run() { while (!shutdown) { //do nothing } System.out.println("exit myThread"); } } public static void main(String[] args) throws InterruptedException { new MyThread().start(); Thread.sleep(1000); shutdown = true; System.out.println("exit main!"); } }
期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,
也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。
这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及
各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当
修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这
一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对
内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。
二、理解synchronized(同步)
1.用法和基本原理
synchronized可以用于:
1)实例方法:
public class Counter { private int count; public synchronized void incre() { count ++; } public synchronized int getCount() { return count; } }
public class CounterThread extends Thread { private Counter counter; public CounterThread(Counter counter) { this.counter = counter; } @Override public void run() { for (int i = 0; i < 1000; i++) { counter.incre(); } } public static void main(String[] args) throws InterruptedException { int num = 1000; Counter counter = new Counter(); Thread[] threads = new Thread[num]; for (int i = 0; i < num; i++) { threads[i] = new CounterThread(counter); threads[i].start(); threads[i].join(); } System.out.println(counter.getCount()); } }
看上去,synchronized使得同时只能有一个线程执行实例方法,
但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,
只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个
对象方法的调用,确保同时只能有一个线程执行。大致过程如下:
1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;
2)执行实例方法代码;
3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒
哪一个是不一定的,不保证公平性。
当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.
注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。
再次强调:synchronized保护的是对象而非代码,只要访问的是同一个对象的synchronized方法,
即使是不同的代码,也会被同步顺序访问。比如,Counter中的两个实例方法,incre和getCount,
对于同一个Counter对象,一个线程想去执行incre,另一个线程想执行getCount,它们是不能同时执行的,
会被synchronized同步顺序执行。
2)静态方法
synchronized保护的是类对象,而非实例对象(实际上,每个对象都有一个锁和等待队列,类对象也不例外)。
public class StaticCounter { private static int count = 0; public static void incr() { synchronized (StaticCounter.class) { count ++; } } public static int getCount() { synchronized (StaticCounter.class) { return count; } } }
3)代码块
public class Counter { private int count; public void incre() { //synchronized括号里面的就是保护对象{}里就是同步执行代码 synchronized (this) { count ++; } } public synchronized int getCount() { synchronized (this) { return count; } } }
public class Counter { private int count; private Object obj = new Object(); public void incre() { //synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列 synchronized (obj) { count ++; } } public synchronized int getCount() { synchronized (obj) { return count; } } }
2.进一步理解synchronized
1)可重入性
对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。
比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。
2)内存可见性
synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,
在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取最新数据。
不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。
加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。
3)死锁
死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,
a、b陷入了互相等待,最后谁都执行不下去。
public class DeadLockDemo { private static Object lockA = new Object(); private static Object lockB = new Object(); public static void startThreadA() { Thread t = new Thread(() -> { synchronized (lockA) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockB) {} } }); t.start(); } public static void startThreadB() { Thread t = new Thread(() -> { synchronized (lockB) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lockA) {} } }); t.start(); } public static void main(String[] args) { startThreadA(); startThreadA(); } }
如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,
如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。
3.同步容器
Collections类中有一些方法,可以返回线程安全的同步容器:
public static <T> Collection<T> synchronizedCollection(Collection<T> c) public static <T> List<T> synchronizedList(List<T> list) public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)
它们是给所有容器方法加上synchronized来实现安全的,例如:
static class SynchronizedCollection<E> implements Collection<E> { final Collection<E> c; //Backing Collection final Object mutex; //Object on which to synchronize SynchronizedCollection(Collection<E> c) { if(c==null) throw new NullPointerException(); this.c = c; mutex = this; }
public int size() { synchronized (mutex) {return c.size();} } public boolean add(E e) { synchronized (mutex) {return c.add(e);} } public boolean remove(Object o) { synchronized (mutex) {return c.remove(o);} } //… }
这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,
不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是
的我们还需要注意以下情况:
1)复合操作
public class EnhancedMap <K, V> { Map<K, V> map; public EnhancedMap(Map<K,V> map){ this.map = Collections.synchronizedMap(map); } public V putIfAbsent(K key, V value){ V old = map.get(key); if(old!=null){ return old; } return map.put(key, value); } public V put(K key, V value){ return map.put(key, value); } //… }
其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,
这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程
执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。
2.伪同步
那么给该方法加上synchronized就线程安全了吗?
public synchronized V putIfAbsent(K key, V value){ V old = map.get(key); if(old!=null){ return old; } return map.put(key, value); }
答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,
而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须
使用相同的锁。所以可以改为:
public V putIfAbsent(K key, V value){ synchronized(map){ V old = map.get(key); if(old!=null){ return old; } return map.put(key, value); } }
3.迭代
对于同步容器对象,虽然单个操作是安全的,但迭代并不是:
public class DeadLockDemo { private static void startModifyThread(final List<String> list) { Thread modifyThread = new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 100; i++) { list.add("item " + i); try { Thread.sleep((int) (Math.random() * 10)); } catch (InterruptedException e) { } } } }); modifyThread.start(); } private static void startIteratorThread(final List<String> list) { Thread iteratorThread = new Thread(new Runnable() { @Override public void run() { while (true) { for(String str : list) { } } } }); iteratorThread.start(); } public static void main(String[] args) { final List<String> list = Collections .synchronizedList(new ArrayList<String>()); startIteratorThread(list); startModifyThread(list); } /*Exception in thread "Thread-0" java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34) at java.lang.Thread.run(Thread.java:748)*/ }
如果在遍历的同时容器发生了结构性变化就会抛出该异常。
可以改为(在遍历的时候给整个容器对象加锁):
private static void startIteratorThread2(final List<String> list) { Thread iteratorThread = new Thread(new Runnable() { @Override public void run() { while(true) { synchronized(list){ for(String str : list) { } } } } }); iteratorThread.start(); }
二、线程的基本协作机制
一)协作的场景
1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到
队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,
在队列为空的时候,消费者需要等待。
2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。
3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他
任务之前需要等待每个子任务执行完毕。
4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装
为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。
5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程
到齐后,交还数据和计算结果,再进行下一次迭代。
二、wait/notify
在Java根父类中定义了一些线程协作的基本方法,这些方法
分为两类:wait和notify。wait主要有两个方法:
public final void wait() throws InterruptedException //单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0) public final native void wait(long timeout) throws InterruptedException;
每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,
会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了
用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列
用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件
队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由
它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。
//notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒 public final native void notify(); //notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒 public final native void notifyAll();
简单的协作示例:
public class WaitThread extends Thread{ private volatile boolean fire = false; @Override public void run() { try { synchronized(this) { while (!fire) { wait(); } } System.out.println("fired!"); } catch (Exception e) { e.printStackTrace(); } } public synchronized void fire() { this.fire = true; notify(); } public static void main(String[] args) throws InterruptedException { WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被
synchronized保护。实际上,wait/notify方法只能在synchronized代码
块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。
看看另一种情况:
public class WaitThread extends Thread{ private volatile boolean fire = false; private volatile int count = 0; @Override public void run() { try { synchronized(this) { while (!fire) { sleep(1000); System.out.println(++count); } } System.out.println("fired"); } catch (Exception e) { e.printStackTrace(); } } //实例对象的锁不会被释放,该方法永远不会被执行 public synchronized void fire() { System.out.println("fire it"); this.fire = true; } public static void main(String[] args) throws InterruptedException { WaitThread waitThread = new WaitThread(); waitThread.start(); Thread.sleep(1000); System.out.println("fire"); waitThread.fire(); } }
思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程
怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?
它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用
wait时,线程会释放对象锁。wait的具体过程是:
1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。
2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:
如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。
否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。
线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般
调用模式是:
synchronized (obj) { while(条件不成立) obj.wait(); …//执行条件满足后的操作 }
比如上例中的代码:
synchronized (this) { while(!fire) { wait(); } }
调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会
释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的
synchronized代码块执行完毕后,等待线程才会从wait调用中返回。
三、生产者消费者模式
public class MyQueue<E> { private Queue<E> queue = null; private int limit; public MyQueue(int limit) { this.limit = limit; queue = new ArrayDeque<E>(limit); } //给生产者用的方法,往队列上放数据,满了就wait public synchronized void put(E e) throws InterruptedException { while (queue.size() == limit) wait(); queue.add(e); //由于条件不同但又使用相同的的等待队列,所以 //要用notifyAll而不能调用notify notifyAll(); } //take给消费者用的方法空了就wait public synchronized E take() throws InterruptedException { while (queue.isEmpty()) wait(); E e = queue.poll(); notifyAll(); return e; } }
只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。
public class Producer extends Thread{ MyQueue<String> queue; public Producer(MyQueue<String> queue) { this.queue = queue; } @Override public void run() { int num = 0; try { while (true) { String task = String.valueOf(num); queue.put(task); System.out.println("produce: " + task); num ++; Thread.sleep(3000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public class Consumer extends Thread{ MyQueue<String> queue; public Consumer(MyQueue<String> queue) { this.queue = queue; } @Override public void run() { try { while (true) { String task = queue.take(); System.out.println("consumer: " + task); Thread.sleep(3000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
三、同时开始
public class FireFlag { private volatile boolean fired = false; public synchronized void waitForFire() throws InterruptedException { while (!fired) { wait(); } } public synchronized void fire() { this.fired = true; notifyAll(); } }
public class Racer extends Thread{ private FireFlag flag; public Racer(FireFlag fireFlag) { this.flag = fireFlag; } @Override public void run() { try { flag.waitForFire(); System.out.println("start run " + Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { int num = 10; FireFlag flag = new FireFlag(); Thread[] threads = new Thread[num]; for (int i = 0; i < num; i++) { threads[i] = new Racer(flag); threads[i].start(); } Thread.sleep(1000); flag.fire(); } }
四、等待结束
join方法实际上就是调用了wait方法:
while (isAlive()) { wait(0); }
只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?
当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要
主线程逐一等待每个子线程。这里我们展示一种新写法:
public class MyLatch { //未完成的线程个数,初始值为子线程总个数 private int threadsCount; public MyLatch(int threadsCount) { this.threadsCount = threadsCount; } public synchronized void await() throws InterruptedException { while (threadsCount > 0) wait(); } public synchronized void countDown() { threadsCount --; if (threadsCount <= 0) this.notifyAll(); } }
public class Worker extends Thread{ MyLatch myLatch; public Worker(MyLatch myLatch) { this.myLatch = myLatch; } @Override public void run() { try { Thread.sleep(5000); myLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { int num = 10; MyLatch myLatch = new MyLatch(num); Thread[] threads = new Thread[num]; for (int i = 0; i < num; i++) { threads[i] = new Worker(myLatch); threads[i].start(); } myLatch.await(); System.out.println("ready to end !"); } }
五、异步结果
在Java中表示子任务的接口是Callable,声明为:
public interface Callable<V> { V call() throws Exception; }
为表示异步调用的结果,定义一个MyFuture接口:
public interface MyFuture <V> { //get方法返回真正的结果,如果结果没有计算完成, //get方法会阻塞直到计算完成 V get() throws Exception ; }
为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute
方法表示执行子任务并返回调用结果。
//利用该方法,对于主线程,就不需要创建并管理子线程了, //并且可以方便地获取异步调用的结果 public <V> MyFuture<V> execute(final Callable<V> task)
实例:
//表示子任务得接口 interface MyCallable<V> { V call() throws Exception; }
//表示异步调用的结果 interface MyFuture<V> { //返回真正的结果 V get() throws Exception; }
interface MyExecutor { <V> MyFuture<V> execute(final MyCallable<V> task); }
//执行任务的子线程 public class ExecuteThread<V> extends Thread{ private V result = null; private Exception exception = null; private volatile boolean done = false; private MyCallable<V> task; private Object lock; public ExecuteThread(MyCallable<V> task, Object lock) { this.task = task; this.lock = lock; } @Override public void run() { try { result = task.call(); } catch (Exception e) { exception = e; } finally { synchronized (lock) { done = true; lock.notifyAll(); } } } public V getResult() { return result; } public boolean isDone() { return done; } public Exception getException() { return exception; } }
public class MyExecutorImpl implements MyExecutor{ @Override public <V> MyFuture<V> execute(MyCallable<V> task) { final Object lock = new Object(); final ExecuteThread<V> thread = new ExecuteThread<>(task, lock); MyFuture<V> myFuture = new MyFuture<V>(){ @Override public V get() throws Exception{ synchronized (lock) { System.out.println("Try to get result"); while (!thread.isDone()) { try { //阻塞直到任务执行完毕 lock.wait(); } catch (Exception e) {} } if (thread.getException() != null) { throw thread.getException(); } } System.out.println("Got it!!!"); return thread.getResult(); } }; thread.start(); return myFuture; } }
public class MainClass { public static void main(String[] args) { MyExecutor executor = new MyExecutorImpl(); MyCallable<Integer> task = new MyCallable<Integer>() { @Override public Integer call() throws Exception { System.out.println("Start do the call."); int result = (int) (Math.random() * 1000); Thread.sleep(5000); System.out.println("Do the call done."); return result; } }; //异步调用,返回一个MyFuture对象 MyFuture<Integer> future = executor.execute(task); //执行其他操作 try { System.out.println("Start do other things."); Thread.sleep(3000); System.out.println("Other things done. "); System.out.println("Ready to get."); //获取异步调用结果 Integer result = future.get(); System.out.println("The result is " + result); } catch (Exception e) { e.printStackTrace(); } /*Start do other things. Do the call done. Other things done. The result is 802*/ } }
六、集合点
public class AssemblePoint { //未到达集合点的线程数量,初始为所有线程数 private int threadsCount; public AssemblePoint(int threadsCount) { this.threadsCount = threadsCount; } public synchronized void await() throws InterruptedException { System.out.println("Now the count is " + threadsCount); if (threadsCount > 0) { threadsCount --; if (threadsCount == 0 ) { notifyAll(); System.out.println("All the threads done."); } else { while (threadsCount !=0) { this.wait(); } } } } }
public class AssemblePointDemo { static class Tourist extends Thread { AssemblePoint point; public Tourist(AssemblePoint point) { this.point = point; } @Override public void run() { try { //模拟各自独自运行 Thread.sleep((long) (Math.random() * 1000)); //该线程运行完毕,集合 point.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { int num = 10; Tourist[] threads = new Tourist[num]; AssemblePoint point = new AssemblePoint(num); for (int i = 0; i < num; i++) { threads[i] = new Tourist(point); threads[i].start(); } } }
三、线程的中断
一)取消/关闭机制
在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,
是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。
//返回线程的中断标志位 public boolean isInterrupted() //中断对应的线程 public void interrupt() //该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空 public static boolean interrupted()
二)线程中断的反应
interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,
我们主要讨论线程状态:
1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;
2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;
3)BLOCKED:线程在等待锁,试图进入同步块
4)NEW/TERMINATED:线程还未启动或者已经结束。
1.RUNNABLE
如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,
没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:
public class InterruptRunnableDemo extends Thread { @Override public void run() { while(!Thread.currentThread().isInterrupted()) { } System.out.println("done "); } }
2.WAITING/TIME_WAITING
线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,
在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。
抛出异常后,中断标志位会被清空,而不是设置。比如:
public static void main(String[] args) { Thread thread = new Thread(() -> { try { Thread.sleep(1000); } catch (Exception e) { System.out.println(Thread.currentThread().isInterrupted()); //false } }); thread.start(); try { Thread.sleep(100); } catch (Exception e) {} thread.interrupt(); }
捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:
1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;
2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能
抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调
用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。
3.BLOCKED
如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程
仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。
public class InterruptSynchronizedDemo { private static Object lock = new Object(); private static class MyThread extends Thread { @Override public void run() { synchronized (lock) { System.out.println("Coming in "); //never while (!Thread.currentThread().isInterrupted()) { } } System.out.println("exit");//never } } public static void test() throws InterruptedException { synchronized (lock) { MyThread thread = new MyThread(); thread.start(); Thread.sleep(1000); thread.interrupt(); thread.join(); } } public static void main(String[] args) { //test方法在持有锁的情况下启动线程thread,而线程thread //也尝试获得锁,所以会进入等待队列。 try { test(); } catch (InterruptedException e) { e.printStackTrace(); } } }
在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。
4.NEW/TERMINATE
调用interrupt无任何效果。
三)如何正确地取消/关闭线程
interrupt方法不一定会真正中断线程,它只是一种协作机制。
对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供
单独的取消/关闭方法给调用者。