zoukankan      html  css  js  c++  java
  • 7.并发编程--多线程通信-wait-notify

    并发编程--多线程通信-wait-notify

    多线程通信:线程通信的目的是为了能够让线程之间相互发送信号;

    1. 多线程通信:

    线程通信的目的是为了能够让线程之间相互发送信号。另外,线程通信还能够使得线程等待其它线程的信号,比如,线程B可以等待线程A的信号,这个信号可以是线程A已经处理完成的信号;
    Object提供了三个方法wait(), notify(), notifyAll()在线程之间进行通信,以此来解决线程间执行顺序等问题。

    • * wait():释放当前线程的同步监视控制器,并让当前线程进入阻塞状态,直到别的线程发出notify将该线程唤醒。
    • * notify():唤醒在等待控制监视器的其中一个线程(随机)。只有当前线程释放了同步监视器锁(调用wait)之后,被唤醒的线程才有机会执行。
    • * notifyAll():与上面notify的区别是同时唤醒多个等待线程。

    值得注意的是这三个方法是属于Object而不是属于Thread的,但是调用的时候必须用同步监视器来调用,wait(), notify(), notifyAll() 必须和synchronized关键字联合使用

    模拟线程通信:自定义实现的通信模式
    示例:ListAdd1.java

     1   public class ListAdd1 {
     2     private volatile static List list = new ArrayList();
     3 
     4       public void add(){
     5         list.add("bjsxt");
     6       }
     7       public int size(){
     8         return list.size();
     9       }
    10 
    11       public static void main(String[] args) {
    12 
    13         final ListAdd1 list1 = new ListAdd1();
    14 
    15         Thread t1 = new Thread(new Runnable() {
    16           @Override
    17           public void run() {
    18             try {
    19               for(int i = 0; i <10; i++){
    20                 list1.add();
    21                 System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
    22                 Thread.sleep(500);
    23               }
    24             } catch (InterruptedException e) {
    25               e.printStackTrace();
    26             }
    27           }
    28         }, "t1");
    29 
    30         Thread t2 = new Thread(new Runnable() {
    31           @Override
    32           public void run() {
    33             while(true){
    34               if(list1.size() == 5){
    35                 System.out.println("当前线程收到通知:" + Thread.currentThread().getName() + " list size = 5 线程停止..");
    36                 throw new RuntimeException();
    37               }
    38             }
    39           }
    40         }, "t2");        
    41 
    42         t1.start();
    43         t2.start();
    44       }
    45   }

    2. 使用JDK的 Object提供了三个方法wait(), notify(), notifyAll()在线程之间进行通信

    示例:

     1   import java.util.ArrayList;
     2   import java.util.List;
     3   import java.util.Queue;
     4   import java.util.concurrent.CountDownLatch;
     5   import java.util.concurrent.LinkedBlockingDeque;
     6   import java.util.concurrent.LinkedBlockingQueue;
     7   /**
     8   * wait notfiy 方法,wait释放锁,notfiy不释放锁
     9   * @@author Maozw
    10   *
    11   */
    12   public class ListAdd2 {
    13       private volatile static List list = new ArrayList();
    14 
    15       public void add(){
    16         list.add("bjsxt");
    17       }
    18       public int size(){
    19         return list.size();
    20       }
    21 
    22       public static void main(String[] args) {
    23 
    24         final ListAdd2 list2 = new ListAdd2();
    25 
    26         // 1 实例化出来一个 lock
    27         // 当使用wait 和 notify 的时候 , 一定要配合着synchronized关键字去使用
    28         //final Object lock = new Object();
    29 
    30         final CountDownLatch countDownLatch = new CountDownLatch(1);
    31 
    32         Thread t1 = new Thread(new Runnable() {
    33           @Override
    34           public void run() {
    35             try {
    36               //synchronized (lock) {
    37                 for(int i = 0; i <10; i++){
    38                   list2.add();
    39                   System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了一个元素..");
    40                   Thread.sleep(500);
    41                   if(list2.size() == 5){
    42                     System.out.println("已经发出通知..");
    43                     countDownLatch.countDown();
    44                     //lock.notify();
    45                   }
    46                 }                        
    47               //}
    48             } catch (InterruptedException e) {
    49               e.printStackTrace();
    50             }
    51 
    52           }
    53         }, "t1");
    54 
    55         Thread t2 = new Thread(new Runnable() {
    56           @Override
    57           public void run() {
    58             //synchronized (lock) {
    59               if(list2.size() != 5){
    60                 try {
    61                   //System.out.println("t2进入...");
    62                   //lock.wait();
    63                   countDownLatch.await();
    64                 } catch (InterruptedException e) {
    65                   e.printStackTrace();
    66                 }
    67               }
    68               System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
    69               throw new RuntimeException();
    70             //}
    71           }
    72         }, "t2");
    73 
    74         t2.start();
    75         t1.start();
    76 
    77       }
    78   }

    问题1:wait()方法外面为什么是while循环而不是if判断?

    问题2:notify()是唤醒一个线程,notifyAll()是唤醒全部线程,但是唤醒然后呢,不管是notify()还是notifyAll(),最终拿到锁的只会有一个线程,那它们到底有什么区别呢?

    OK! 要回答上述两个问题?我们首先需要明白java对象锁的模型:
    JVM 会为每一个使用内部锁(synchronized)的对象维护两个集合,Entry Set和Wait Set,也有人翻译为锁池和等待池,意思基本一致。
    **Entry Set**

    • 如果线程A已经持有了对象锁,此时如果有其他线程也想获得该对象锁的话,它只能进入Entry Set,并且处于线程的BLOCKED状态。

    **Wait Set**

    • 如果线程A调用了wait()方法,那么线程A会释放该对象的锁,进入到Wait Set,并且处于线程的WAITING状态。

    sequenceDiagram
    Entry Set(锁池)->>Wait Set(等待池): wait()
    Wait Set(等待池)->>Entry Set(锁池): noitify()

     注意:某个线程B想要获得对象锁,一般情况下有两个先决条件,

    • 一是对象锁已经被释放了(如曾经持有锁的前任线程A执行完了synchronized代码块或者调用了wait()方法等等)
    • 二是线程B已处于RUNNABLE状态。

    那么这两类集合中的线程都是在什么条件下可以转变为RUNNABLE呢?

    • 对于Entry Set中的线程,当对象锁被释放的时候,JVM会唤醒处于Entry Set中的某一个线程,这个线程的状态就从BLOCKED转变为RUNNABLE。
    • 对于Wait Set中的线程,当对象的notify()方法被调用时,JVM会唤醒处于Wait Set中的某一个线程,这个线程的状态就从WAITING转变为RUNNABLE;或者当notifyAll()方法被调用时,Wait Set中的全部线程会转变为RUNNABLE状态。所有Wait Set中被唤醒的线程会被转移到Entry Set中,然后 每当对象的锁被释放后,那些所有处于RUNNABLE状态的线程会共同去竞争获取对象的锁.

    解答
     第一个问题 :wait()方法外面为什么是while循环而不是if判断?

    •  因为wait()的线程永远不能确定其他线程会在什么状态下notify(),所以必须在被唤醒、抢占到锁并且从wait()方法退出的时候再次进行指定条件的判断,以决定是满足条件往下执行呢还是不满足条件再次wait()呢。

    第二个问题:既然notify()和notifyAll()最终的结果都是只有一个线程能拿到锁,那唤醒一个和唤醒多个有什么区别呢?

    • 通过下面这个例子可以非常好的说明;是这样一个场景:两个生产者两个消费者的场景,我们都使用notify()而非notifyAll(),假设消费者线程1拿到了锁,判断buffer为空,那么wait(),释放锁;然后消费者2拿到了锁,同样buffer为空,wait(),也就是说此时Wait Set中有两个线程;然后生产者1拿到锁,生产,buffer满,notify()了, 那么可能消费者1被唤醒了,但是此时还有另一个线程生产者2在Entry Set中盼望着锁,并且最终抢占到了锁, 但因为此时buffer是满的,因此它要wait();然后消费者1拿到了锁,消费,notify();这时就有问题了,此时生产者2和消费者2都在Wait Set中,buffer为空,如果唤醒生产者2,没毛病;但如果唤醒了消费者2,因为buffer为空,它会再次wait(),这就尴尬了,万一生产者1已经退出不再生产了,没有其他线程在竞争锁了,只有生产者2和消费者2在Wait Set中互相等待,那传说中的死锁就发生了。
    • notify()换成notifyAll(),这样的情况就不会再出现了,因为每次notifyAll()都会使其他等待的线程从Wait Set进入Entry Set,从而有机会获得锁。
     1 import java.util.ArrayList;
     2 import java.util.List;
     3 
     4 public class Something {
     5     private Buffer mBuf = new Buffer();
     6 
     7     public void produce() {
     8         synchronized (this) {
     9             while (mBuf.isFull()) {
    10                 try {
    11                     wait();
    12                 } catch (InterruptedException e) {
    13                     e.printStackTrace();
    14                 }
    15             }
    16             mBuf.add();
    17             notifyAll();
    18         }
    19     }
    20 
    21     public void consume() {
    22         synchronized (this) {
    23             while (mBuf.isEmpty()) {
    24                 try {
    25                     wait();
    26                 } catch (InterruptedException e) {
    27                     e.printStackTrace();
    28                 }
    29             }
    30             mBuf.remove();
    31             notifyAll();
    32         }
    33     }
    34 
    35     private class Buffer {
    36         private static final int MAX_CAPACITY = 1;
    37         private List innerList = new ArrayList<>(MAX_CAPACITY);
    38 
    39         void add() {
    40             if (isFull()) {
    41                 throw new IndexOutOfBoundsException();
    42             } else {
    43                 innerList.add(new Object());
    44             }
    45             System.out.println(Thread.currentThread().toString() + " add");
    46 
    47         }
    48 
    49         void remove() {
    50             if (isEmpty()) {
    51                 throw new IndexOutOfBoundsException();
    52             } else {
    53                 innerList.remove(MAX_CAPACITY - 1);
    54             }
    55             System.out.println(Thread.currentThread().toString() + " remove");
    56         }
    57 
    58         boolean isEmpty() {
    59             return innerList.isEmpty();
    60         }
    61 
    62         boolean isFull() {
    63             return innerList.size() == MAX_CAPACITY;
    64         }
    65     }
    66 
    67     public static void main(String[] args) {
    68         Something sth = new Something();
    69         Runnable runProduce = new Runnable() {
    70             int count = 4;
    71 
    72             @Override
    73             public void run() {
    74                 while (count-- > 0) {
    75                     sth.produce();
    76                 }
    77             }
    78         };
    79         Runnable runConsume = new Runnable() {
    80             int count = 4;
    81 
    82             @Override
    83             public void run() {
    84                 while (count-- > 0) {
    85                     sth.consume();
    86                 }
    87             }
    88         };
    89         for (int i = 0; i < 2; i++) {
    90             new Thread(runConsume).start();
    91         }
    92         for (int i = 0; i < 2; i++) {
    93             new Thread(runProduce).start();
    94         }
    95     }
    96 }

    join

    首先,join()是Thread类的一个方法,而不是object的方法;
    JDK中是这样描述的:

    //join()方法的作用,是等待这个线程结束
    public final void join()throws InterruptedException: Waits for this thread to die.
    在Java 7 Concurrency Cookbook"的定义为:
    join() method suspends the execution of the calling thread until the object called finishes its execution.
    也就是说,t.join()方法阻塞调用此方法的线程(calling thread),直到线程t完成,此线程再继续;
    举个例子:通常用于在main()主线程内,等待其它线程完成再结束main()主线程。例如:
     1 package com.maozw.springmvc.controller;
     2 
     3 import java.util.Date;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class JoinTest implements Runnable {
     7 
     8     private String name;
     9 
    10     public JoinTest(String name) {
    11         this.name = name;
    12     }
    13 
    14     public void run() {
    15         System.out.printf("%s begins: %s
    ", name, new Date());
    16         try {
    17             TimeUnit.SECONDS.sleep(4);
    18         } catch (InterruptedException e) {
    19             e.printStackTrace();
    20         }
    21         System.out.printf("%s has finished: %s
    ", name, new Date());
    22     }
    23 
    24     public static void main(String[] args) {
    25         Thread thread1 = new Thread(new JoinTest("One"));
    26         Thread thread2 = new Thread(new JoinTest("Two"));
    27         thread1.start();
    28         thread2.start();
    29 
    30         try {
    31             thread1.join();
    32             thread2.join();
    33         } catch (InterruptedException e) {
    34             e.printStackTrace();
    35         }
    36         System.out.println("Main thread is finished");
    37     }
    38 }

    输出结果

    1 One begins: Mon Jul 23 22:41:21 CST 2018
    2 Two begins: Mon Jul 23 22:41:21 CST 2018
    3 Two has finished: Mon Jul 23 22:41:25 CST 2018
    4 One has finished: Mon Jul 23 22:41:25 CST 2018
    5 Main thread is finished

    解说join原理:

    我们尝试去打开的起源码:

    • 通过源码可以看出,Join方法实现是通过wait。 当main线程调用t.join时候,main线程会获得线程对象t的锁(wait 意味着拿到该对象的锁),调用该对象的wait(等待时间),直到该对象唤醒main线程 ,比如退出后。这就意味着main 线程调用t.join时,必须能够拿到线程t对象的锁。
    /**
     * Waits for this thread to die.
     *
     * <p> An invocation of this method behaves in exactly the same
     * way as the invocation
     *
     * <blockquote>
     * {@linkplain #join(long) join}{@code (0)}
     * </blockquote>
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final void join() throws InterruptedException {
            join(0);
    }
    /**
     * Waits at most {@code millis} milliseconds for this thread to
     * die. A timeout of {@code 0} means to wait forever.
     *
     * <p> This implementation uses a loop of {@code this.wait} calls
     * conditioned on {@code this.isAlive}. As a thread terminates the
     * {@code this.notifyAll} method is invoked. It is recommended that
     * applications not use {@code wait}, {@code notify}, or
     * {@code notifyAll} on {@code Thread} instances.
     *
     * @param  millis
     *         the time to wait in milliseconds
     *
     * @throws  IllegalArgumentException
     *          if the value of {@code millis} is negative
     *
     * @throws  InterruptedException
     *          if any thread has interrupted the current thread. The
     *          <i>interrupted status</i> of the current thread is
     *          cleared when this exception is thrown.
     */
    public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;
    
        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }
    
        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }
  • 相关阅读:
    [hdu4436 str2int]后缀自动机SAM(或后缀数组SA)
    bytedance专题
    LSTM+CRF维特比解码过程
    spark core类梳理
    spark源码阅读---Utils.getCallSite
    python2.7官方文档阅读笔记
    cs224d---词向量表示
    cs231n---强化学习
    cs231n---生成模型
    Spring 2017 Assignments3
  • 原文地址:https://www.cnblogs.com/Mao-admin/p/9988893.html
Copyright © 2011-2022 走看看