zoukankan      html  css  js  c++  java
  • 建房子之前先挖地基

    最近一直在看《Think In Java》里关于并发部分的章节,读到第二十一章有一个有趣的比喻:必须先挖房子的地基,但是接下来可以并行的铺设钢结构和构建水泥部件,而这两项任务必须在混凝土浇筑之前完成。管道必须在水泥板浇注之前到位,而水泥板必须在开始构筑房屋骨架之前到位。

    在这些任务中,某些可以并行执行,但是某些步骤需要所有的任务都结束之后才能开动,这是线程之间协作的必要性。

    在此之前,我们学习过使用notify()、notifyAll()和wait()来控制线程间的协作,让我们先来回顾一下。notify()、notifyAll()和wait()这三个方法同属于Object对象,wait()会使得当前线程等待并交出对象的锁,直到别的线程调用notify()或notifyAll()后可能会被唤醒。

    对于一些简单的问题,这已经够用了,但是Java SE5中的concurrent包中提供了BlockingQueue、Condition等类来帮助我们完成更复杂的线程间协作的任务。

    下面看一个例子,一台机器具有三个任务:一个制作吐司、一个给吐司抹黄油,另一个在抹过黄油的吐司上涂果酱。通过各个处理过程之间的BlockingQueue来运行这个程序。来自Think In Java (p.s. 我觉得这本书难懂的原因,在于你在理解它教导并发概念的同时,还得十分小心地注意其余的语法细节,一定要有耐心!)。

    package concurrency;//: concurrency/ToastOMatic.java
    // A toaster that uses queues.
    import java.util.concurrent.*;
    import java.util.*;
    import static net.mindview.utill.Print.*;
    
    class Toast {
      public enum Status { DRY, BUTTERED, JAMMED }
      private Status status = Status.DRY;
      private final int id;
      public Toast(int idn) { id = idn; }
      public void butter() { status = Status.BUTTERED; }
      public void jam() { status = Status.JAMMED; }
      public Status getStatus() { return status; }
      public int getId() { return id; }
      public String toString() {
        return "Toast " + id + ": " + status;
      }
    }
    
    class ToastQueue extends LinkedBlockingQueue<Toast> {}
    
    class Toaster implements Runnable {
      private ToastQueue toastQueue;
      private int count = 0;
      private Random rand = new Random(47);
      public Toaster(ToastQueue tq) { toastQueue = tq; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            TimeUnit.MILLISECONDS.sleep(
              100 + rand.nextInt(500));
            // Make toast
            Toast t = new Toast(count++);
            print(t);
            // Insert into queue
            toastQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Toaster interrupted");
        }
        print("Toaster off");
      }
    }
    
    // Apply butter to toast:
    class Butterer implements Runnable {
      private ToastQueue dryQueue, butteredQueue;
      public Butterer(ToastQueue dry, ToastQueue buttered) {
        dryQueue = dry;
        butteredQueue = buttered;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = dryQueue.take();
            t.butter();
            print(t);
            butteredQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Butterer interrupted");
        }
        print("Butterer off");
      }
    }
    
    // Apply jam to buttered toast:
    class Jammer implements Runnable {
      private ToastQueue butteredQueue, finishedQueue;
      public Jammer(ToastQueue buttered, ToastQueue finished) {
        butteredQueue = buttered;
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = butteredQueue.take();
            t.jam();
            print(t);
            finishedQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Jammer interrupted");
        }
        print("Jammer off");
      }
    }
    
    // Consume the toast:
    class Eater implements Runnable {
      private ToastQueue finishedQueue;
      private int counter = 0;
      public Eater(ToastQueue finished) {
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = finishedQueue.take();
            // Verify that the toast is coming in order,
            // and that all pieces are getting jammed:
            if(t.getId() != counter++ ||
               t.getStatus() != Toast.Status.JAMMED) {
              print(">>>> Error: " + t);
              System.exit(1);
            } else
              print("Chomp! " + t);
          }
        } catch(InterruptedException e) {
          print("Eater interrupted");
        }
        print("Eater off");
      }
    }
    
    public class ToastOMatic {
      public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue(),
                   butteredQueue = new ToastQueue(),
                   finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    View Code

    看完晕乎乎的?很正常,所以才需要我来给大家讲解啦 :)

    首先可以注意到的,程序中并没有出现任何Lock对象或是synchronized关键字来同步,这是因为在实现BlockingQueue的队列类内部已经使用Condition在维护。这降低了程序的耦合度,使得每个类只需要和自己的BlockingQueue通信。

    程序中定义了:

    一个实体类:Toast。使用enum来管理状态是一个优秀的示例。

    三个队列:dryQueue、butteredQueue、finishedQueue

    四个Runnable任务:Toaster、Butterer、Jammer、Eater

    根据字面意思理解,当线程不被中断的时候,Toaster负责制作吐司,所以只需要和dryQueue通信。Butterer在吐司上涂黄油,需要从dryQueue中取出原味土司,涂上黄油(t.butter())后放入butteredQueue。Jammer在抹过黄油的吐司上涂果酱,需要从butteredQueue中取出,涂上果酱后放入finishedQueue。Eater就只需要从finishedQueue中取出来吃啦。细心的读者还会发现Eater中做了检查,如果不是涂上果酱的吐司就不吃(傲娇的表情)。如果线程被中断,任务就打印信息并退出。

    TimeUnit.SECONDS.sleep(3)的作用是当前线程等待3秒,等待后台制作吐司。exec.shutdownNow()停止当前线程池。

    是不是觉得自己理解了?那么还有一道课后题留给大家:修改ToastOMatic.java,使用两个单独的组装线来创建涂有黄油和果酱的三明治(即不必先涂黄油再涂果酱,可以异步处理,明显提高工作效率)。

    答案在这里:

    //: concurrency/E29_ToastOMatic2.java
    /********************** Exercise 29 ***********************
     * Modify ToastOMatic.java to create peanut butter and jelly
     * on toast sandwiches using two separate assembly lines 
     * (one for peanut butter, the second for jelly, then
     * merging the two lines).
    *********************************************************/
    package concurrency;
    import java.util.concurrent.*;
    import java.util.*;
    import static net.mindview.utill.Print.*;
    
    class Toast {
      public enum Status { 
        DRY,
        BUTTERED,
        JAMMED,
        READY {
          public String toString() {
            return
              BUTTERED.toString() + " & " + JAMMED.toString();
          }
        }
      }
      private Status status = Status.DRY;
      private final int id;
      public Toast(int idn) { id = idn; }
      public void butter() {
        status =
          (status == Status.DRY) ? Status.BUTTERED :
                                   Status.READY;
      }
      public void jam() {
        status =
          (status == Status.DRY) ? Status.JAMMED :
                                   Status.READY;
      }
      public Status getStatus() { return status; }
      public int getId() { return id; }
      public String toString() {
        return "Toast " + id + ": " + status;
      }
    }
    
    class ToastQueue extends LinkedBlockingQueue<Toast> {}
    
    class Toaster implements Runnable {
      private ToastQueue toastQueue;
      private int count;
      private Random rand = new Random(47);
      public Toaster(ToastQueue tq) { toastQueue = tq; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            TimeUnit.MILLISECONDS.sleep(
              100 + rand.nextInt(500));
            // Make toast
            Toast t = new Toast(count++);
            print(t);
            // Insert into queue
            toastQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Toaster interrupted");
        }
        print("Toaster off");
      }
    }
    
    // Apply butter to toast:
    class Butterer implements Runnable {
      private ToastQueue inQueue, butteredQueue;
      public Butterer(ToastQueue in, ToastQueue buttered) {
        inQueue = in;
        butteredQueue = buttered;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = inQueue.take();
            t.butter();
            print(t);
            butteredQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Butterer interrupted");
        }
        print("Butterer off");
      }
    }
    
    // Apply jam to toast:
    class Jammer implements Runnable {
      private ToastQueue inQueue, jammedQueue;
      public Jammer(ToastQueue in, ToastQueue jammed) {
        inQueue = in;
        jammedQueue = jammed;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = inQueue.take();
            t.jam();
            print(t);
            jammedQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Jammer interrupted");
        }
        print("Jammer off");
      }
    }
    
    // Consume the toast:
    class Eater implements Runnable {
      private ToastQueue finishedQueue;
      public Eater(ToastQueue finished) {
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = finishedQueue.take();
            // Verify that all pieces are ready for consumption:
            if(t.getStatus() != Toast.Status.READY) {
              print(">>>> Error: " + t);
              System.exit(1);
            } else
              print("Chomp! " + t);
          }
        } catch(InterruptedException e) {
          print("Eater interrupted");
        }
        print("Eater off");
      }
    }
    
    // Outputs alternate inputs on alternate channels:
    class Alternator implements Runnable {
      private ToastQueue inQueue, out1Queue, out2Queue;
      private boolean outTo2;  // control alternation
      public Alternator(ToastQueue in, ToastQueue out1,
              ToastQueue out2) {
        inQueue = in;
        out1Queue = out1;
        out2Queue = out2;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = inQueue.take();
            if(!outTo2)
              out1Queue.put(t);
            else
              out2Queue.put(t);
            outTo2 = !outTo2;  // change state for next time
          }
        } catch(InterruptedException e) {
          print("Alternator interrupted");
        }
        print("Alternator off");
      }
    }
    
    // Accepts toasts on either channel, and relays them on to
    // a "single" successor
    class Merger implements Runnable {
      private ToastQueue in1Queue, in2Queue, toBeButteredQueue,
        toBeJammedQueue, finishedQueue;
      public Merger(ToastQueue in1, ToastQueue in2,
              ToastQueue toBeButtered, ToastQueue toBeJammed,
              ToastQueue finished) {
        in1Queue = in1;
        in2Queue = in2;
        toBeButteredQueue = toBeButtered;
        toBeJammedQueue = toBeJammed;
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = null;
            while(t == null) {
              t = in1Queue.poll(50, TimeUnit.MILLISECONDS);
              if(t != null)
                break;
              t = in2Queue.poll(50, TimeUnit.MILLISECONDS);
            }
            // Relay toast onto the proper queue
            switch(t.getStatus()) {
              case BUTTERED:
                toBeJammedQueue.put(t);
                break;
              case JAMMED:
                toBeButteredQueue.put(t);
                break;
              default:
                finishedQueue.put(t);
            }
          }
        } catch(InterruptedException e) {
          print("Merger interrupted");
        }
        print("Merger off");
      }
    }
    
    public class E29_ToastOMatic2 {
      public static void main(String[] args) throws Exception {
        ToastQueue 
          dryQueue = new ToastQueue(),
          butteredQueue = new ToastQueue(),
          toBeButteredQueue = new ToastQueue(),
          jammedQueue = new ToastQueue(),
          toBeJammedQueue = new ToastQueue(),
          finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Alternator(dryQueue, toBeButteredQueue,
          toBeJammedQueue));
        exec.execute(
          new Butterer(toBeButteredQueue, butteredQueue));
        exec.execute(
          new Jammer(toBeJammedQueue, jammedQueue));
        exec.execute(new Merger(butteredQueue , jammedQueue,
          toBeButteredQueue, toBeJammedQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    没想清楚前不许偷看!

    因为代码比较长,推荐把代码导入IDE查看。

  • 相关阅读:
    网页图表Highcharts实践教程之外层图表区
    网页图表Highcharts实践教程之图表代码构成
    网页图表Highcharts实践教程之认识Highcharts
    Playmaker全面实践教程之Playmaker常用工具
    Playmaker全面实践教程之简单的使用Playmaker示例
    Playmaker全面实践教程之playMaker编辑器
    Playmaker Input篇教程之引入的核心概念
    Playmaker Input篇教程之PlayMaker菜单概述
    Playmaker Input篇教程之Playmaker购买下载和导入
    关于中值滤波算法,以及C语言实现(转)
  • 原文地址:https://www.cnblogs.com/andrew-chen/p/4991961.html
Copyright © 2011-2022 走看看