线程之间的协作
前面我们可以使用线程来同时运行多个任务,通过锁(互斥)来同步两个任务的行为,从而使得一个任务不会干涉另一个的任务的资源。现在研究如何使任务之间通过协作,以使得多个任务可以一起工作去解决某个问题。我们可以在互斥的基础上使用挂起操作,直到某些外部条件反生变化时,让这个任务继续往前执行。下面具体讨论该方法的实现。
1.挂起操作
1.1 wait()操作
我们可以等待某个条件发生变化之间将线程挂起,当条件满足之后通过notify()或notifyAll()将挂起的线程唤醒,告知其可以继续运行了。注意调用wait()、notify()、notifyAll()时,需要首先获得某个对象的锁,否则会出现以下错误信息。
当线程wait()时会释放掉对象锁(sychronized),这是与sleep()和yield()不同的地方。在进行条件判断时,应当首先获取对象锁,简单的说就是将条件判断放置着sychronized代码块内。
1 package com.dy.xidian; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.TimeUnit; 6 7 class Car { 8 private boolean stop = false; 9 public synchronized void stop() { 10 stop = true; 11 notifyAll(); 12 } 13 14 public synchronized void drive() { 15 stop = false; 16 notifyAll(); 17 } 18 19 public synchronized void waitForStopFinish() throws InterruptedException { 20 while (stop == false) 21 wait(); 22 } 23 24 public synchronized void waitForDriveFinish() throws InterruptedException { 25 while (stop == true) 26 wait(); 27 } 28 } 29 30 class Stop implements Runnable { 31 private Car car; 32 public Stop(Car c) { 33 car = c; 34 } 35 36 @Override 37 public void run() { 38 try { 39 while (!Thread.interrupted()) { 40 System.out.println("car stop!"); 41 TimeUnit.MILLISECONDS.sleep(500); 42 car.stop(); 43 car.waitForDriveFinish(); 44 } 45 } catch (InterruptedException e) { 46 System.out.println("Exiting via interrupt"); 47 } 48 System.out.println("Ending stop task"); 49 50 } 51 } 52 53 class Drive implements Runnable { 54 private Car car; 55 public Drive(Car c) { 56 car = c; 57 } 58 59 @Override 60 public void run() { 61 try { 62 while (!Thread.interrupted()) { 63 car.waitForStopFinish(); 64 System.out.println("car drive!"); 65 TimeUnit.MILLISECONDS.sleep(200); 66 car.drive(); 67 } 68 } catch (InterruptedException e) { 69 System.out.println("Exiting via interrupt"); 70 } 71 System.out.println("End drive task"); 72 } 73 } 74 75 public class Test { 76 public static void main(String[] args) throws InterruptedException { 77 Car car = new Car(); 78 ExecutorService exec = Executors.newCachedThreadPool(); 79 exec.execute(new Stop(car)); 80 exec.execute(new Drive(car)); 81 TimeUnit.SECONDS.sleep(2); 82 exec.shutdownNow(); 83 } 84 }
该程序模拟的汽车行驶的过程,汽车会走一会,停一会,直到一个中断产生。while()循环的使用,如果一个线程正在处于等待状态,当被唤醒时需要重新检测当前的条件是否满足继续等待,因为有可能这个唤醒操作并不是要唤醒这个线程,或者说唤醒之后条件又发生了改变,需要继续wait()。
1.2 await()操作
JAVA中也提供了Condition类,在Condition上调用await()来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者通过signalAll()来唤醒所有在Condition上挂起的任务。现在使用Condition来重写上面的程序。
1 package com.dy.xidian; 2 3 import java.util.concurrent.ExecutorService; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.TimeUnit; 6 import java.util.concurrent.locks.Condition; 7 import java.util.concurrent.locks.Lock; 8 import java.util.concurrent.locks.ReentrantLock; 9 10 class Car { 11 private Lock lock = new ReentrantLock(); 12 Condition condition = lock.newCondition(); 13 private boolean stop = false; 14 15 public void stop() { 16 lock.lock(); 17 stop = true; 18 condition.signalAll(); 19 lock.unlock(); 20 } 21 22 public void drive() { 23 lock.lock(); 24 stop = false; 25 condition.signalAll(); 26 lock.unlock(); 27 } 28 29 public void waitForStopFinish() throws InterruptedException { 30 lock.lock(); 31 while (stop == false) 32 condition.await(); 33 lock.unlock(); 34 } 35 36 public void waitForDriveFinish() throws InterruptedException { 37 lock.lock(); 38 while (stop == true) 39 condition.await(); 40 lock.unlock(); 41 } 42 } 43 44 class Stop implements Runnable { 45 private Car car; 46 public Stop(Car c) { 47 car = c; 48 } 49 50 @Override 51 public void run() { 52 try { 53 while (!Thread.interrupted()) { 54 System.out.println("car stop!"); 55 TimeUnit.MILLISECONDS.sleep(500); 56 car.stop(); 57 car.waitForDriveFinish(); 58 } 59 } catch (InterruptedException e) { 60 System.out.println("Exiting via interrupt"); 61 } 62 System.out.println("Ending stop task"); 63 64 } 65 } 66 67 class Drive implements Runnable { 68 private Car car; 69 public Drive(Car c) { 70 car = c; 71 } 72 73 @Override 74 public void run() { 75 try { 76 while (!Thread.interrupted()) { 77 car.waitForStopFinish(); 78 System.out.println("car drive!"); 79 TimeUnit.MILLISECONDS.sleep(200); 80 car.drive(); 81 } 82 } catch (InterruptedException e) { 83 System.out.println("Exiting via interrupt"); 84 } 85 System.out.println("End drive task"); 86 } 87 } 88 89 public class Test { 90 public static void main(String[] args) throws InterruptedException { 91 Car car = new Car(); 92 ExecutorService exec = Executors.newCachedThreadPool(); 93 exec.execute(new Stop(car)); 94 exec.execute(new Drive(car)); 95 TimeUnit.SECONDS.sleep(2); 96 exec.shutdownNow(); 97 } 98 }
2.notify()与notifyAll()
notifyAll()唤醒等待在某个对象上的所有线程,而notify()只是随机唤醒其中一个。注意前面我们已经说过,wait()、notify()、notifyAll()必须sychronized块中。其实还有一点就是,notify()和notifyAll()必须在线程等待的对象上调用,而不是在Thread中直接调用。(如上面的代码中,Car类中提供了notifyAll()方法调用的接口,而不是直接在A线程中调用B线程的notify()方法来唤醒B线程)。一旦线程得到了通知,它首先会重新获得所等待对象的锁。如果获取成功,则会从紧跟着wait()调用之后的语句继续执行。下面的代码是为了说明调用某个对象的wait()方法时,会使得线程处于挂起状态,当调用该对象的notify()/notifyAll()方法时,线程才会被唤醒。wait()与notify()/notifyAll()是绑定在对象上,所以可以说任务在某个对象上处于wait()状态。
package com.dy.xidian; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; class Blocker { synchronized void waitingCall() { try { while (!Thread.interrupted()) { wait(); System.out.println(Thread.currentThread()); } } catch (InterruptedException e) { } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); } } class Task implements Runnable { static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); } } class Task2 implements Runnable { static Blocker blocker = new Blocker(); @Override public void run() { blocker.waitingCall(); } } public class Test { public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < 3; i++) exec.execute(new Task()); exec.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; @Override public void run() { if (prod) { System.out.println(" notify()"); Task.blocker.prod(); prod = false; } else { System.out.println(" notifyAll()"); Task.blocker.prodAll(); prod = true; } } }, 400, 400); TimeUnit.SECONDS.sleep(5); timer.cancel(); System.out.println(" Timer canceled"); TimeUnit.MILLISECONDS.sleep(400); System.out.println("Task2.blocker.prodAll()"); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500); System.out.println(" Shutting down"); exec.shutdownNow(); } }
3.生产者与消费者
生产者对缓冲区进行写操作,消费者对缓冲区进行读操作。缓冲区的大小固定的,生产者不能让缓冲区溢出,消费者不能从空缓冲区读值。下面的代码缓冲区用数组实现,注意,等待是反生在缓冲区对象上,而不是线程调用自己的wait()方法。
1 package com.dy.xidian; 2 3 import java.util.Random; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.TimeUnit; 7 8 class Producer implements Runnable { 9 Repository repos; 10 Random random = new Random(); 11 12 public Producer(Repository repos) { 13 this.repos = repos; 14 } 15 16 @Override 17 public void run() { 18 try { 19 while (true) { 20 int numWantToPut = random.nextInt(10); 21 repos.put(); 22 int numCanPut = 10 - repos.index; 23 int temp = repos.index + Math.min(numWantToPut, numCanPut); 24 int i; 25 for (i = repos.index; i < temp; i++) { 26 TimeUnit.MILLISECONDS.sleep(100); 27 System.out.println("Producer i =" + i); 28 repos.repository[i] = new Product(); 29 } 30 repos.index = i; 31 repos.putFinished(); 32 } 33 } catch (InterruptedException e) { 34 } 35 36 } 37 38 } 39 40 class Customer implements Runnable { 41 Repository repos; 42 Random random = new Random(); 43 public Customer(Repository repos) { 44 this.repos = repos; 45 } 46 47 @Override 48 public void run() { 49 try { 50 while (true) { 51 int numWanToGet = random.nextInt(10); 52 repos.get(); 53 int numCanGet = repos.index; 54 int temp = repos.index - 1 - Math.min(numWanToGet, numCanGet); 55 int i; 56 for (i = repos.index - 1; i > temp; i--) { 57 TimeUnit.MILLISECONDS.sleep(100); 58 System.out.println("Customer i =" + i); 59 repos.repository[i] = null; 60 } 61 repos.index = i + 1; 62 repos.getFinished(); 63 } 64 } catch (InterruptedException e) { 65 } 66 } 67 } 68 69 class Product { 70 int num; 71 int weight; 72 } 73 74 class Repository { 75 int index = 0; 76 boolean state = true; 77 Product[] repository = new Product[10]; 78 79 public synchronized void put() throws InterruptedException { 80 while (state != true) 81 wait(); 82 } 83 84 public synchronized void get() throws InterruptedException { 85 while (state != false) 86 wait(); 87 } 88 89 public synchronized void putFinished() { 90 state = false; 91 notifyAll(); 92 } 93 94 public synchronized void getFinished() { 95 state = true; 96 notifyAll(); 97 } 98 } 99 100 public class Test { 101 static volatile int i = 0; 102 static Repository repos = new Repository(); 103 public static void main(String[] args) throws InterruptedException { 104 ExecutorService exec = Executors.newCachedThreadPool(); 105 exec.execute(new Customer(repos)); 106 exec.execute(new Producer(repos)); 107 TimeUnit.SECONDS.sleep(10); 108 exec.shutdownNow(); 109 } 110 }
4.同步队列
上面生产者与消费者之间的缓冲区使用数组实现的,可以发现对数组的操作比较麻烦,处理不好就会发生越界,并且编程人员要处理好挂起与唤醒操作之间的逻辑关系。java中提供了一个更有效的解决方案,来实现对缓冲区的访问,那就同步队列(BlockingQueue)。同步队列在任何时候只允许一个线程插入或者移除元素,它的实现类有ArrayBlockingQueue(固定大小),LinkedBlockingQueue(无大小限制)。下面使用三个线程对队列进程访问。一个线程制作面包,一个线程给面包摸黄油,最后一个线程再往上摸果酱。注意:对队列中比较适合放对象
1 package com.dy.xidian; 2 3 import java.util.Random; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.LinkedBlockingQueue; 7 import java.util.concurrent.TimeUnit; 8 9 enum Status { 10 DRY, BUTTERED, JAMMED 11 } 12 13 class Bread { 14 private Status status = Status.DRY; 15 private final int id; 16 public Bread(int idn) { 17 id = idn; 18 } 19 public void butter() { 20 status = Status.BUTTERED; 21 } 22 public void jam() { 23 status = Status.JAMMED; 24 } 25 26 public Status getStatus() { 27 return status; 28 } 29 public int getId() { 30 return id; 31 } 32 public String toString() { 33 return "Bread" + id + ":" + status; 34 } 35 } 36 37 @SuppressWarnings("serial") 38 class BreadQueue extends LinkedBlockingQueue<Bread> { 39 } 40 class Toast implements Runnable { 41 private BreadQueue breadQueue; 42 private int count = 0; 43 private Random rand = new Random(47); 44 45 public Toast(BreadQueue breadQueue) { 46 this.breadQueue = breadQueue; 47 } 48 49 @Override 50 public void run() { 51 try { 52 while (!Thread.interrupted()) { 53 TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500)); 54 Bread b = new Bread(count++); 55 System.out.println(b); 56 breadQueue.put(b); 57 } 58 } catch (InterruptedException e) { 59 System.out.println("Toast interrupted"); 60 } 61 System.out.println("Toast off"); 62 } 63 } 64 65 class Butter implements Runnable { 66 private BreadQueue dryQueue, butteredQueue; 67 68 public Butter(BreadQueue dryQueue, BreadQueue butteredQueue) { 69 super(); 70 this.dryQueue = dryQueue; 71 this.butteredQueue = butteredQueue; 72 } 73 74 @Override 75 public void run() { 76 try { 77 while (!Thread.interrupted()) { 78 Bread b = dryQueue.take(); 79 b.butter(); 80 System.out.println(b); 81 butteredQueue.put(b); 82 } 83 } catch (InterruptedException e) { 84 System.out.println("interrupted exception!"); 85 } 86 System.out.println("Butter off"); 87 } 88 } 89 90 class Jammer implements Runnable { 91 private BreadQueue butterQueue, finishQueue; 92 93 public Jammer(BreadQueue butterQueue, BreadQueue finishQueue) { 94 super(); 95 this.butterQueue = butterQueue; 96 this.finishQueue = finishQueue; 97 } 98 99 @Override 100 public void run() { 101 try { 102 while (!Thread.interrupted()) { 103 Bread b = butterQueue.take(); 104 b.jam(); 105 System.out.println(b); 106 finishQueue.put(b); 107 } 108 } catch (InterruptedException e) { 109 System.out.println("Interrupted exception!"); 110 } 111 System.out.println("Jammer off"); 112 } 113 } 114 115 class Eater implements Runnable { 116 private BreadQueue finishQueue; 117 private int counter = 0; 118 public Eater(BreadQueue finishQueue) { 119 super(); 120 this.finishQueue = finishQueue; 121 } 122 123 @Override 124 public void run() { 125 try { 126 while (!Thread.interrupted()) { 127 Bread b = finishQueue.take(); 128 if (b.getId() != counter++ || b.getStatus() != Status.JAMMED) { 129 System.out.println(">>>>Error:" + b); 130 System.exit(1); 131 } else { 132 System.out.println("Chomp " + b); 133 } 134 } 135 } catch (InterruptedException e) { 136 System.out.println("Interrupted Exception!"); 137 } 138 System.out.println("Eater off!"); 139 } 140 } 141 142 public class BreadMatic { 143 static BreadQueue q1 = new BreadQueue(); 144 static BreadQueue q2 = new BreadQueue(); 145 static BreadQueue q3 = new BreadQueue(); 146 public static void main(String[] args) throws InterruptedException { 147 ExecutorService exec = Executors.newCachedThreadPool(); 148 exec.execute(new Toast(q1)); 149 exec.execute(new Butter(q1, q2)); 150 exec.execute(new Jammer(q2, q3)); 151 exec.execute(new Eater(q3)); 152 TimeUnit.SECONDS.sleep(5); 153 exec.shutdownNow(); 154 } 155 }
5.管道
线程间也可以使用管道进行数据传输,并且是以字符流的方式,这是与队列不同的地方。Java中提供了PipedWriter类可以作为管道的输入,PipedReader可以作为管道的输出。如果输出发现没有数据了,管道将自动阻塞。对与管道的创建尽可能早完成,比如说写到构造方法里面。值得注意的是,管道上的阻塞是可以被中断的,即PipedReader是可中断的,这是与普通IO不同的地方。
1 package com.dy.xidian; 2 3 import java.io.IOException; 4 import java.io.PipedReader; 5 import java.io.PipedWriter; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.TimeUnit; 9 10 class Sender implements Runnable { 11 PipedWriter pw = new PipedWriter(); 12 @Override 13 public void run() { 14 15 try { 16 while (true) { 17 for (char c = 'a'; c <= 'z'; c++) { 18 pw.write(c); 19 TimeUnit.MILLISECONDS.sleep(100); 20 } 21 } 22 } catch (IOException e) { 23 System.out.println(e + "Sender write Exception!"); 24 } catch (InterruptedException e) { 25 System.out.println("Interrupted Exception!"); 26 } 27 } 28 29 public PipedWriter getPw() { 30 return pw; 31 } 32 33 } 34 35 class Receiver implements Runnable { 36 PipedReader pr; 37 public Receiver(Sender sender) throws IOException { 38 super(); 39 pr = new PipedReader(sender.getPw()); 40 } 41 42 @Override 43 public void run() { 44 try { 45 while (true) { 46 System.out.print("Read:" + (char) pr.read() + " "); 47 } 48 } catch (IOException e) { 49 System.out.println(e + "Receiver read exception!"); 50 } 51 } 52 53 } 54 55 public class PipeIO { 56 public static void main(String[] args) throws IOException, 57 InterruptedException { 58 Sender sender = new Sender(); 59 Receiver receiver = new Receiver(sender); 60 ExecutorService exec = Executors.newCachedThreadPool(); 61 exec.execute(sender); 62 exec.execute(receiver); 63 TimeUnit.SECONDS.sleep(4); 64 exec.shutdownNow(); 65 } 66 }