zoukankan      html  css  js  c++  java
  • JAVA并发体系-1.3-线程之间的协作

    当任务协作时,关键问题是这些任务之间的握手/通信

    为了实现这种握手/通信,我们使用了相同的基础特性:互斥。互斥能够确保只有一个任务可以响应某个信号,这样就可以根除任何可能的竞争条件。在互斥之上,我们为任务添加了一种途径,可以将其自身挂起,直至某些外部条件发生变化(例如,管道现在已经到位),表示是时候让这个任务向前开动了为止。

    这种握手/通信可以通过Object的方法wait()和 notify()来安全地实现。 并发类库还提供了具有await和 signal()方法的Condition对象。我们将看到产生的各类问题,以及相应的解决方案。

    wait和notify

    wait会在等待外部世界产生变化的时候将任务挂起,并且只有在notify或notifyAll发生时,这个任务才会被唤醒并去检查所产生的变化。因此wait不是忙等待

    对 wait的调用的时候,线程的执行被挂起,对象上的锁被释放,这就意味着另一个任务可以获得这个锁。

    • wait不是忙等待
    • 调用wait会释放锁
      • 而调用sleep、yield的时候锁并没有被释放

    只能在同步控制方法或同步控制块里调用wait、 notify和 notifyAll。如果在非同步控制方法里调用这些方法,程序能通过编译,但运行的时候,将得到IllegalMonitorStateException异常,并伴随着一些含糊的消息,比如“当前线程不是拥有者”。(消息的意思是,调用 wait、notify和notifyAll的任务在调用这些方法前必须“拥有”(获取)对象的锁)

    有两种形式的wait.

    第一种版本接受亳秒数作为参数,含义与 sleep方法里参数的意思相同,都是指“在此期间暂停”。但是wait与sleep是不同的,对于wait而言,wait与sleep的区别如下:

    1. 在wait期间对象锁是释放的。
    2. 可以通过notify、 notifyAll,或者令时间到期,从wait中恢复执行。

    第二种的wait不接受任何参数(也是更常用形式),这种wait将无限等待下去,直到线程接收到 notify或者 notifyAll消息。

    wait、 notify以及 notifyAll有一个比较特殊的方面,那就是这些方法是基类 Object的部分,而不是属于 Thread的一部分。这是有道理的,因为这些方法操作的锁也是所有对象的一部分。

    调用 wait、notify和notifyAll的任务在调用这些方法前必须“拥有”(获取)对象的锁(在下面的一个例子可以看到)

    可以让另一个对象执行某种操作以维护其自己的锁。要这么做的话,必须首先得到对象的锁。比如,如果要向对象x发送 notifyAll,那么就必须在能够取得x的锁的同步控制块中这么做:

    synchronized{
        x.notifyAll();
    }
    

    程序实例

    todo: code: remove here

    class Car {
      private boolean waxOn = false;
      public synchronized void waxed() {
        waxOn = true; // Ready to buff
        notifyAll();
      }
      public synchronized void buffed() {
        waxOn = false; // Ready for another coat of wax
        notifyAll();
      }
      public synchronized void waitForWaxing()
      throws InterruptedException {
        while(waxOn == false)
          wait();
      }
      public synchronized void waitForBuffing()
      throws InterruptedException {
        while(waxOn == true)
          wait();
      }
    }
    
    class WaxOn implements Runnable {
      private Car car;
      public WaxOn(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            printnb("Wax On! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.waxed();
            car.waitForBuffing();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax On task");
      }
    }
    
    class WaxOff implements Runnable {
      private Car car;
      public WaxOff(Car c) { car = c; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            car.waitForWaxing();
            printnb("Wax Off! ");
            TimeUnit.MILLISECONDS.sleep(200);
            car.buffed();
          }
        } catch(InterruptedException e) {
          print("Exiting via interrupt");
        }
        print("Ending Wax Off task");
      }
    }
    
    public class WaxOMatic {
      public static void main(String[] args) throws Exception {
        Car car = new Car();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new WaxOff(car));
        exec.execute(new WaxOn(car));
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        exec.shutdownNow(); // Interrupt all tasks
      }
    } /* Output: (95% match)
    Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
    Ending Wax On task
    Exiting via interrupt
    Ending Wax Off task
    *///:~
    

    这里,Car有一个单一的布尔属性 waxOn,表示涂蜡抛光处理的状态在 waitForWaxing中将检查 waxOn标志,如果它为 false,那么这个调用任务将通过调用wait而被挂起。这个行为发生在 synchronized方法中这一点很重要,因为在这样的方法中,任务已经获得了锁当你调用 wait时,线程被挂起,而锁被释放。锁被释放这一点是本质所在,因为为了安全地改变对象的状态(例如,将 waxOn改变为true,如果被挂起的任务要继续执行,就必须执行该动作),其他某个任务就必须能够获得这个锁。在本例中,如果另一个任务调用waxed来表示“是时候该干点什么了”,那么就必须获得这个锁,从而将 waxOn改变为true。之后, waxed()调用 notifyAll,这将唤醒在对wait的调用中被挂起的任务。为了使该任务从wait中唤醒,它必须首先重新获得当它进入wait时释放的锁。在这个锁变得可用之前,这个任务是不会被唤醒的°。

    策略:前面的示例强调你必须用一个检查感兴趣的条件的 while循环包围wait.这很重要,因为:

    • 你可能有多个任务出于相同的原因在等待同一个锁,而第一个唤醒任务可能会改变这种状况(即使你没有这么做,有人也会通过继承你的类去这么做)。如果属于这种情况,那么这个任务应该被再次挂起,直至其感兴趣的条件发生变化。
    • 在这个任务从其 wait中被唤醒的时刻,有可能会有某个其他的任务已经做出了改变,从而使得这个任务在此时不能执行,或者执行其操作已显得无关紧要。此时,应该通过再次调用wait来将其重新挂起。
    • 也有可能某些任务出于不同的原因在等待你的对象上的锁(在这种情况下必须使用notifyAll)。在这种情况下,你需要检查是否已经由正确的原因唤醒,如果不是,就再次调用 wait因此,

    其本质就是要检査所感兴趣的特定条件,并在条件不满足的情况下返回到 wait中。惯用的方法就是使用while来编写这种代码

    notify和notifyAll

    因为在技术上,可能会有多个任务在单个Car对象上处于wait状态,因此调用 notifyAll比只调用 notify要更安全。但是,上面程序的结构(WaxOMatic代码)只会有一个任务实际处于wait状态,因此你可以使用 notify来代替 notifyAll

    使用 notify而不是 notifyAll是一种优化。使用 notify时,在众多等待同一个锁的任务中只有一个会被唤醒,因此如果你希望使用 notify,就必须保证被唤醒的是恰当的任务。另外,为了使用notify,所有任务必须等待相同的条件(todo:尚未搞懂这里???),因为如果你有多个任务在等待不同的条件,那么你就不会知道是否唤醒了恰当的任务。如果使用 notify,当条件发生变化时,必须只有个任务能够从中受益。最后,这些限制对所有可能存在的子类都必须总是起作用的。如果这些规则中有任何一条不满足,那么你就必须使用notifyAll而不是 notify

    notifyAll将唤醒“所有正在等待的任务”。这是否意味着在程序中任何地方,任何处于wait状态中的任务都将被任何对notifyAll的调用唤醒呢?事实上,当 notifyAll因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒:可以见下面的代码示例。

    在更复杂的情况下,可能会有多个任务在某个特定对象锁上等待,因此你不知道哪个任务应该被唤醒。因此,调用 notifyAll要更安全一些,这样可以唤醒等待这个锁的所有任务,而每个任务都必须决定这个通知是否与自己相关。

    todo: code: remove here

    class Blocker {
      synchronized void waitingCall() {
        try {
          while(!Thread.interrupted()) {
            wait();
            System.out.print(Thread.currentThread() + " ");
          }
        } catch(InterruptedException e) {
          // OK to exit this way
        }
      }
      synchronized void prod() { notify(); }
      synchronized void prodAll() { notifyAll(); }
    }
    
    class Task implements Runnable {
      static Blocker blocker = new Blocker();
      public void run() { blocker.waitingCall(); }
    }
    
    class Task2 implements Runnable {
      // A separate Blocker object:
      static Blocker blocker = new Blocker();
      public void run() { blocker.waitingCall(); }
    }
    
    public class NotifyVsNotifyAll {
      public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        for(int i = 0; i < 5; i++)
          exec.execute(new Task());
        exec.execute(new Task2());
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new TimerTask() {
          boolean prod = true;
          public void run() {
            if(prod) {
              System.out.print("
    notify() ");
              Task.blocker.prod();
              prod = false;
            } else {
              System.out.print("
    notifyAll() ");
              Task.blocker.prodAll();
              prod = true;
            }
          }
        }, 400, 400); // Run every .4 second
        TimeUnit.SECONDS.sleep(5); // Run for a while...
        timer.cancel();
        System.out.println("
    Timer canceled");
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.print("Task2.blocker.prodAll() ");
        Task2.blocker.prodAll();
        TimeUnit.MILLISECONDS.sleep(500);
        System.out.println("
    Shutting down");
        exec.shutdownNow(); // Interrupt all tasks
      }
    } /* Output: (Sample)
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
    notify() Thread[pool-1-thread-1,5,main]
    notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
    Timer canceled
    Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
    Shutting down
    *///:~
    

    从输出中你可以看到,即使存在Task2.blocker上阻塞的Task2对象,也没有任何在Task.blocker上的notify或 notifyAll调用会导致Task对象被唤醒。与此类似,在main的结尾,调用了 timer的 cancel,即使计时器被撤销了,前5个任务也依然在运行;并仍旧在它们对Task.blocker.waitingCall的调用中被阻塞。对Task2.blocker.prodAll的调用所产生的输出不包括任何在 Task.blocker中的锁上等待的任务。

    如果你浏览 Blocker中的 prodprodAll,就会发现这是有意义的。这些方法是synchronized的,这意味着它们将获取自身的锁,因此当它们调用 notify或 notifyAll时,只在这个锁上调用是符合逻辑的——因此,将只唤醒在等待这个特定锁的任务。

    注意下面这几段话,健壮性的保证

    Blocker.waitingCall非常简单,以至于在本例中,你只需声明for(;;)而不是 while(!Thread interrupted())就可以达到相同的效果,因为在本例中,由于异常而离开循环和通过检查interrupted标志离开循环是没有任何区别的——在两种情况下都要执行相同的代码。但是,事实上,这个示例选择了检查 interrupted,因为存在着两种离开循环的方式。如果在以后的某个时刻,你决定要在循环中添加更多的代码,那么如果没有覆盖从这个循环中退出的这两条路径,就会产生引入错误的风险。

    在并发应用中,某个其他的任务可能会在 WaitPerson被唤醒时,会突然插足并拿走订单,唯一安全的方式是使用下面这种wait的惯用法(当然要在恰当的同步内部,并采用防止错失信号可能性的程序设计)

    while(conditionIsNotMet)
    	wait();
    

    这可以保证在你退出等待循环之前,条件将得到满足,并且如果你收到了关于某事物的通知,而它与这个条件并无关系,或者在你完全退出等待循环之前,这个条件发生了变化,都可以确保你可以重返等待状态。

    lock和condition对象

    可以通过在condition上调用await来挂起一个任务,通过调用signal来通知这个任务,从而唤醒一个任务,或者调用signalAll来唤醒所有在这个condition上被自身挂起的任务,于notifyAll相比,signalAll是一个更为安全的方式

    同步队列

    同步队列:BlockingQueueLinkedBlockingQueueArrayBlockingQueue,如果消费者任务试图从队列获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。使用BlockingQueue会产生简化,使用显式的wait和notify时存在的类和类之间的耦合被削除了,因为每个类都只和他的BlockingQueue通信,即注意下面这个生产者消费者的例子:

    todo: code: remove here

    class Toast {
      public enum Status { DRY, BUTTERED, JAMMED }
      private Status status = Status.DRY;
      private final int id;
      public Toast(int idn) { id = idn; }
      public void butter() { status = Status.BUTTERED; }
      public void jam() { status = Status.JAMMED; }
      public Status getStatus() { return status; }
      public int getId() { return id; }
      public String toString() {
        return "Toast " + id + ": " + status;
      }
    }
    
    class ToastQueue extends LinkedBlockingQueue<Toast> {}
    
    class Toaster implements Runnable {
      private ToastQueue toastQueue;
      private int count = 0;
      private Random rand = new Random(47);
      public Toaster(ToastQueue tq) { toastQueue = tq; }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            TimeUnit.MILLISECONDS.sleep(
              100 + rand.nextInt(500));
            // Make toast
            Toast t = new Toast(count++);
            print(t);
            // Insert into queue
            toastQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Toaster interrupted");
        }
        print("Toaster off");
      }
    }
    
    // Apply butter to toast:
    class Butterer implements Runnable {
      private ToastQueue dryQueue, butteredQueue;
      public Butterer(ToastQueue dry, ToastQueue buttered) {
        dryQueue = dry;
        butteredQueue = buttered;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = dryQueue.take();
            t.butter();
            print(t);
            butteredQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Butterer interrupted");
        }
        print("Butterer off");
      }
    }
    
    // Apply jam to buttered toast:
    class Jammer implements Runnable {
      private ToastQueue butteredQueue, finishedQueue;
      public Jammer(ToastQueue buttered, ToastQueue finished) {
        butteredQueue = buttered;
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = butteredQueue.take();
            t.jam();
            print(t);
            finishedQueue.put(t);
          }
        } catch(InterruptedException e) {
          print("Jammer interrupted");
        }
        print("Jammer off");
      }
    }
    
    // Consume the toast:
    class Eater implements Runnable {
      private ToastQueue finishedQueue;
      private int counter = 0;
      public Eater(ToastQueue finished) {
        finishedQueue = finished;
      }
      public void run() {
        try {
          while(!Thread.interrupted()) {
            // Blocks until next piece of toast is available:
            Toast t = finishedQueue.take();
            // Verify that the toast is coming in order,
            // and that all pieces are getting jammed:
            if(t.getId() != counter++ ||
               t.getStatus() != Toast.Status.JAMMED) {
              print(">>>> Error: " + t);
              System.exit(1);
            } else
              print("Chomp! " + t);
          }
        } catch(InterruptedException e) {
          print("Eater interrupted");
        }
        print("Eater off");
      }
    }
    
    public class ToastOMatic {
      public static void main(String[] args) throws Exception {
        ToastQueue dryQueue = new ToastQueue(),
                   butteredQueue = new ToastQueue(),
                   finishedQueue = new ToastQueue();
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Toaster(dryQueue));
        exec.execute(new Butterer(dryQueue, butteredQueue));
        exec.execute(new Jammer(butteredQueue, finishedQueue));
        exec.execute(new Eater(finishedQueue));
        TimeUnit.SECONDS.sleep(5);
        exec.shutdownNow();
      }
    } /* (Execute to see output) *///:~
    

    错失的信号(潜在死锁)

    T1:
    synchronized(sharedMonitor){
        <setup condition for T2>
        sharedMonitor.notify();
    }
    T2:
    while(someCondition){
        //point 1
        synchronized(sharedMonitor){
            sharedMonitor.wait();
        }
    }
    

    错失信号(死锁情况):假设T2对someCondition求值发现为true,在point 1线程调度器切换到T1,T1执行设置(即<setup condition for T2>),然后嗲用notify,然后T2继续执行。此时对于T2来说,时机已经太晚了,以至于不能意识到这个条件已经发生了变化,因此会盲目进入 wait,此时 notify将错失,而T2也将无限地等待这个已经发送过的信号,从而产生死锁。

    解决方案:(防止在someCondition变量上产生竞争条件)

    synchronized(sharedMonitor){
        while(someCondition){
            sharedMonitor.wait();            
        }
    }
    

    现在:如果T1首先执行,当控制返回T2时,它将发现条件发生了变化,从而不会进入 wait。反过来,如果T2首先执行,那它将进入wait,并且稍后会由T1唤醒。因此,信号不会错失。

  • 相关阅读:
    Java集合
    插入排序
    修改button的可点击区域
    这就是工作
    Cocos2dx使用TextField实现输入框
    SVN解决本地版本控制与服务器版本冲突问题
    ParallaxNode视差节点实现远景近景的不同层次移动
    人生最重要的三个领域——健康、财富和爱
    什么是开发框架-- (转载)
    C++函数模版的简单使用
  • 原文地址:https://www.cnblogs.com/cheaptalk/p/12549677.html
Copyright © 2011-2022 走看看