zoukankan      html  css  js  c++  java
  • 线程间的通信

    一,概述。 

      1.什么叫做线程间通信: 在1个进程中,线程往往不是孤立存在的,线程之间需要一些协调通信,来共同完成一件任务。也就是通过一定的方法来实现线程间的“交流”。

      2.线程间通信的体现:
        - 1个线程传递数据给另1个线程
        - 在1个线程中执行完特定任务后,转到另1个线程继续执行任务
    二,线程通信的方法。
      Object类中相关的方法有两个notify方法和三个wait方法:notify() / notifyAll() / wait()

      因为wait和notify方法定义在Object类中,因此会被所有的类所继承。这些方法都是final的,即它们都是不能被重写的,不能通过子类覆写去改变它们的行为。

      1.wait()方法。

      语法:锁对象.wait()

      特点:wait()方法的调用使得当前线程必须要等待,直到另外一个线程调用notify()或者notifyAll()方法(注意必须调用notify/notifyAll方法才能唤醒)。

            wait()方法的调用必须在同步的前提下。(因为该方法是要用锁对象调用,而只有在同步的情况下才有锁)

            wait()方法的调用会导致锁的释放。

      线程调用wait()方法,释放它对锁的拥有权,然后等待另外的线程来通知它(通知的方式是notify()或者notifyAll()方法),这样它才能重新获得锁的拥有权和恢复执行。要确保调用wait()方法的时候拥有锁,即,wait()方法的调用必须放在synchronized方法或synchronized块中。

      一个小比较:

      当线程调用了wait()方法时,它会释放掉对象的锁。

      另一个会导致线程暂停的方法:Thread.sleep(),它会导致线程睡眠指定的毫秒数,但线程在睡眠的过程中是不会释放掉对象的锁的

      2.notify和notifyAll()
      语法:锁对象.notify()/notifyAll()
      特点:notify()方法的调用可以唤醒当前锁对象下等待的另一个单个线程。优先级较高的优先唤醒,如果优先级一样,则随机唤醒。notifyAll()方法的调用可以唤醒当前锁对象下等待的所有线程。
         notify()和notifyAll()方法必须在同步的前提下使用(也就是notify方法调用必须放在synchronized方法或synchronized块中)。
         被唤醒的线程是不能被执行的,需要等到当前线程放弃这个对象的锁,也就是想要唤醒其他线程,说光调用notify()不行,还必须要调用wait()方法让当前线程释放锁对象,当另外一个线程获得了锁对象即可以执行。
      注意:wait() / notify() / notifyAll()方法使用的锁对象必须和同步代码块中的锁对象是同一个,否则报异常!!!
     
     【代码演示】:线程间通信。
     1 class Bank1 {//公共资源
     2     int total=100;
     3 }
     4 class PresonA implements Runnable{
     5     Bank1  b;
     6     //因为要确保两个用户使用的是同一家银行,所以在主函数中new一个银行,然后通过构造函数将这一银行传递给这两个用户
     7     public PresonA(Bank1 b){
     8         this.b=b;//将银行的引用传给两个用户
     9     }
    10     @Override
    11     public void run() {
    12         while (true){
    13             synchronized (b){//同步代码块,因为要确保两个用户使用的是同一个锁,所以选择银行对象作为锁
    14                 if(b.total>=0){//打钱,当卡里钱大于0,就唤醒另一个线程来取钱
    15                     b.notify();//唤醒另一个线程来取钱,注意该方法是要用锁对象调用的
    16                     try {
    17                         b.wait();//接着将自己睡眠,然后把锁释放给两一个线程
    18                     } catch (InterruptedException e) {
    19                         e.printStackTrace();
    20                     }
    21                 }
    22                 b.total=b.total+100;//A先存了100
    23                 System.out.println("A存了100,目前存款为:"+b.total);
    24             }
    25         }
    26     }
    27 }
    28 class PresonB implements Runnable{
    29     Bank1  b;
    30     public PresonB(Bank1 b){
    31         this.b=b;
    32     }
    33     @Override
    34     public void run() {
    35         while (true){
    36             synchronized (b){
    37                 if(b.total<=0){//取钱,当卡里钱小于0,就唤醒另一个线程来存钱
    38                     b.notify();//唤醒另一个线程
    39                     try {
    40                         b.wait();//将自己睡眠
    41                     } catch (InterruptedException e) {
    42                         e.printStackTrace();
    43                     }
    44                 }
    45                 b.total=b.total-100;
    46                 System.out.println("****B取了100,目前存款为:"+b.total);
    47             }
    48         }
    49     }
    50 }
    51 public class CommunicationDemo {
    52     public static void main(String[] args) {
    53         Bank1 b=new Bank1();//作为公共资源传递给两个用户
    54         new Thread(new PresonA(b)).start();
    55         new Thread(new PresonB(b)).start();
    56     }
    57 }

     运行结果:

      【代码演示】:生产者与消费者 

     1 class Resource{
     2     int count=0;//商品库存数
     3 }
     4 class Producer implements Runnable{
     5     Resource r;
     6     public Producer(Resource r){
     7         this.r=r;
     8     }
     9     public void run(){
    10         while (true){
    11             synchronized (r){
    12                 if(r.count>0){
    13                     try {
    14                         r.wait();
    15                     } catch (InterruptedException e) {
    16                         e.printStackTrace();
    17                     }
    18                 }
    19                 System.out.println("生产了一件商品,此时商品数为:"+(++r.count));
    20                 r.notify();
    21             }
    22         }
    23     }
    24 }
    25 class Consumer implements Runnable{
    26     Resource r;
    27     public Consumer(Resource r){
    28         this.r=r;
    29     }
    30     public void run(){
    31         while (true){
    32             synchronized (r){
    33                 if(r.count<=0){
    34                     try {
    35                         r.wait();
    36                     } catch (InterruptedException e) {
    37                         e.printStackTrace();
    38                     }
    39                 }
    40                 System.out.println("消费了一件商品,此时商品数为:"+(--r.count));
    41                 r.notify();
    42             }
    43         }
    44     }
    45 }
    46 public class ProducerConsumerDemo {
    47     public static void main(String[] args) {
    48         Resource r=new Resource();
    49         new Thread(new Producer(r)).start();
    50         new Thread(new Consumer(r)).start();
    51     }
    52 }

    运行结果:

       这是在只有两个线程在运行时的结果。如果有四个线程在同时运行呢?(两个生产者两个个消费者)

       会发现运行结果是混乱的,为什么这样??我们先来分析一下这个运行的过程,假设是0号线程(生产者)先获得了CPU的执行权,于是它生产了一个商品,然后接着调用了notify()方法来唤醒其他线程,假设这次随机唤醒的是1号线程(生产者),虽然1号线程这时被唤醒了,但是由于没有CPU 的执行权,1号线程还在运行不了,在进行第二次while循环时,由于判断资源数是大于0的,所以调用了wait()方法,此时0号线程进行等待,并释放了CPU 的执行权,由于被唤醒的只有1号线程,所以1号线程获得了CPU的执行权开始运行,当进入while循环后,判断资源数发现是大于0的,所以1号线程也被挂起进行等代,此时只剩下2号线程和3号线程(消费者),假设2线程先获得CPU执行权,进行判断满足条件,于是就消费了一个商品,然后调用了notify()方法,因为0号线程先进入线程池,所以此时先唤醒的是0号线程,在2号线程在进行第二次while循环时,由于不满足条件,所以调用了wait()方法,此时2号线程进行等待,由于0号线程已被唤醒,所以0号线程接着被锁之前的代码接着运行,生产了一个商品,然后唤醒了1号线程,当0号线释放掉CPU后,1号线程也接着被锁之前的代码进行运算,没有进行判断就直接生产了一个商品。

      由于线程在被唤醒后是接着被冻结之前的代码接着运行的,所以唤醒之后不会进行判断,就直接生产或消费了。所以就会出现问题。

      解决的方法就是:就是让各个线程在被唤醒之后仍然进行条件判断,所以将if改为while,同时将notify()改为notifyAll(),否则会发生死锁情况。

      定义while判断标记:让被唤醒的线程再一次判断标记。

      定义notifyAll():因为需要唤醒对方线程。如果只要notify,容易出现只唤醒本方线程的情况,导致程序中的所有都等待。

    改过之后运行的结果:

    1、 总结notify()notifyAll()的区别与联系

    1) notify只是唤醒一个正在wait当前对象锁的线程,而notifyAll唤醒所有。值得注意的是:notify是本地方法,具体唤醒哪一个线程由虚拟机控制;如果有多个线程等待,则线程规划器任意挑选出其中一个wait()状态的线程来发出通知

    2) 当有线程调用了对象的notifyAll()方法(唤醒所有wait线程)或 notify()方法(只随机唤醒一个 wait 线程),被唤醒的的线程便会进入该对象的锁池中,锁池中的线程会去竞争该对象锁。也就是说,调用了notify后只有一个线程会由等待池进入锁池,而notifyAll会将该对象等待池内的所有线程移动到锁池中,等待锁竞争

    3) 调用notify和notifyAll方法后,当前线程并不会立即放弃锁的持有权,而必须要等待当前同步代码块执行完才会让出锁

    4) 优先级高的线程竞争到对象锁的概率大,假若某线程没有竞争到该对象锁,它还会留在锁池中,唯有线程再次调用wait()方法,它才会重新回到等待池中。而竞争到对象锁的线程则继续往下执行,直到执行完了synchronized代码块,它会释放掉该对象锁,这时锁池中的线程会继续竞争该对象锁。

     锁池:获取同一把锁的线程,在为抢夺锁时会进入到锁池
    等待池:调用wait操作的线程,会释放掉锁,进入到该对象的等待池中

    三,线程另一种通信的方法---Condition,Lock1的具体使用。

      使用notifyAll()方法是将所有被冻结的线程都唤醒,那如果只唤醒对方线程该怎么办?

      在jdk1.5之后添加了java.util.concurrent.locks包,在此包中有Condition,Lock,ReadWriteLock接口供使用。与synchronized相比,lock更加灵活,使用更加广泛。

      其中Lock代替了同步代码块synchronized;Condition的await(),signal(),signalAll()代替了Object中的wait(),notify(),notifyAll()方法。

    1 public interface Condition{
    2   void await() throws InterruptedException;//使线程进入休眠,类似于wait()
    3   void awaitUninterruptibly();
    4   long awaitNanos(long nanosTimeout) throws InterruptedException;
    5   boolean await(long time,TimeUnit unit) throws InterruptedException;
    6   boolean awaitUnit(Date deadline) throws InterruptedException;
    7   void  signal();//唤醒因await进入休眠的一个线程,类似于notify();
    8   void  signalAll();//唤醒因await进入休眠的所有线程,类似于notifyAll();
    9 }

      注意:1.Condition需要和lock结合使用。且lock和Condition必须是同一个ReentrantLock下的,否则会报iLLegalMonitorStateException异常。

         2.作用于同一个Condition实例下的线程之间才能进行通信。

        Lock lock=new ReentrantLock();
        Condition condition=lock.newCondition();

      【代码演示】:多个线程间的通信。 (使用signalAll方法,唤醒所有线程)

     1 import java.util.concurrent.locks.Condition;
     2 import java.util.concurrent.locks.Lock;
     3 import java.util.concurrent.locks.ReentrantLock;
     4 class Resource{
     5     int count=1;//商品库存数
     6     Lock lock=new ReentrantLock();
     7     Condition condition=lock.newCondition();
     8 }
     9 class Producer implements Runnable{
    10     Resource r;
    11     Lock lock;
    12     Condition condition;
    13     public Producer(Resource r){
    14         this.r=r;
    15         this.lock=r.lock;
    16         this.condition=r.condition;
    17     }
    18     public void run(){
    19             lock.lock();//加锁,如果获取失败,将进入休眠状态
    20             try {
    21                 while(true){
    22                     while (r.count>0){
    23                         condition.await();//将自己阻塞
    24                     }
    25                     System.out.println(Thread.currentThread().getName()+"生产了一件商品,此时商品数为:"+(++r.count));
    26                     condition.signalAll();//唤醒其他所有线程
    27                 }
    28             } catch (InterruptedException e) {
    29                 e.printStackTrace();
    30             }finally {
    31                 lock.unlock();//释放锁
    32             }
    33     }
    34 }
    35 class Consumer implements Runnable{
    36     Resource r;
    37     Lock lock;
    38     Condition condition;
    39     public Consumer(Resource r){
    40         this.r=r;
    41         this.lock=r.lock;
    42         this.condition=r.condition;
    43     }
    44     public void run(){
    45             lock.lock();//加锁
    46             try {
    47                 while(true){
    48                     while (r.count<=0){
    49                         condition.await();
    50                     }
    51                     System.out.println(Thread.currentThread().getName()+"消费了一件商品,此时商品数为:"+(--r.count));
    52                     condition.signalAll();
    53                 }
    54             } catch (InterruptedException e) {
    55                  e.printStackTrace();
    56             }finally {
    57                  lock.unlock();//释放锁()
    58             }
    59     }
    60 }
    61 public class ProducerConsumerDemo {
    62     public static void main(String[] args) {
    63         Resource r=new Resource();
    64         new Thread(new Producer(r)).start();
    65         new Thread(new Producer(r)).start();
    66         new Thread(new Consumer(r)).start();
    67         new Thread(new Consumer(r)).start();
    68     }
    69 }

      在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll(),传统线程的通信方式,使用Condition都可以实现,这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。

       这样看来,Condition和传统的线程通信可能没什么区别,但Condition的强大之处在于它可以为多个线程间建立不同的Condition,用下面这个代码演示一下。

      【代码演示】:多个线程间的通信。(使用signalAll()方法,只唤醒对方的线程)

     1 import java.util.concurrent.locks.Condition;
     2 import java.util.concurrent.locks.Lock;
     3 import java.util.concurrent.locks.ReentrantLock;
     4 class Resource{
     5     int count=0;//商品库存数
     6     Lock lock=new ReentrantLock();
     7     Condition condition_con=lock.newCondition();
     8     Condition condition_pro=lock.newCondition();
     9 }
    10 class Producer implements Runnable {
    11     Resource r;
    12     Lock lock;
    13     Condition condition_con;
    14     Condition condition_pro;
    15     public Producer(Resource r) {
    16         this.r = r;
    17         this.lock = r.lock;
    18         this.condition_con = r.condition_con;
    19         this.condition_pro = r.condition_pro;
    20     }
    21     public void run() {
    22         lock.lock();//加锁
    23         try {
    24             while(true){
    25 
    26                 while (r.count > 0) {
    27                     condition_pro.await();
    28                 }
    29                 System.out.println(Thread.currentThread().getName() + "生产了一件商品,此时商品数为:" + (++r.count));
    30                 condition_con.signal();
    31             }
    32         } catch (InterruptedException e) {
    33             e.printStackTrace();
    34         } finally {
    35             lock.unlock();
    36             //释放锁,因为await方法发生异常时,程序停止运行,则就会一直持有锁,而造成死锁。所以将unlock放在finally中
    37         }
    38     }
    39 }
    40 
    41 class Consumer implements Runnable{
    42     Resource r;
    43     Lock lock;
    44     Condition condition_con;
    45     Condition condition_pro;
    46     public Consumer(Resource r){
    47         this.r=r;
    48         this.lock=r.lock;
    49         this.condition_con=r.condition_con;
    50         this.condition_pro=r.condition_pro;
    51     }
    52     public void run(){
    53         lock.lock();//加锁
    54             try {
    55                 while (true){
    56                     while (r.count<=0) {
    57                         condition_con.await();
    58                     }
    59                     System.out.println(Thread.currentThread().getName()+"消费了一件商品,此时商品数为:"+(--r.count));
    60                     condition_pro.signal();
    61                 }
    62             } catch (InterruptedException e) {
    63                 e.printStackTrace();
    64             }finally {
    65                 lock.unlock();//释放锁
    66             }
    67     }
    68 }
    69 public class ProducerConsumerDemo {
    70     public static void main(String[] args) {
    71         Resource r=new Resource();
    72         new Thread(new Producer(r)).start();
    73         new Thread(new Producer(r)).start();
    74         new Thread(new Consumer(r)).start();
    75         new Thread(new Consumer(r)).start();
    76     }
    77 }

     线程通信的方法--CountDownLatch方式

    • CountDownLatch是在java1.5被引入的,存在java.util.concurrent包下。
    • CountDownLatch能够使一个线程等待其他线程完成各自的工作之后,在执行。
    • CountDownLatch是通过一个计数器来实现的,计数器的初始值设置为要等待的线程的数量。每当一个线程完成自己的任务后,计数器的值就会减一,当计数器的值到达0时,它表示所有线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

      【代码演示】:教练需要等待所有运动员到齐且准备好之后,教练才能开始训练。

     1 import java.util.concurrent.CountDownLatch;
     2 public class CoachRacerDemo {
     3     private CountDownLatch countDownLatch=new CountDownLatch(3);//设置要等待的运动员是3个
     4     /**
     5      * 运动员方法
     6      */
     7     public void racer(){
     8         //获取运动员的名称
     9         String name=Thread.currentThread().getName();
    10         //运动员开始准备:打印准备信息
    11         System.out.println(name+"正在准备。。。");
    12         //准备中
    13         try {
    14             Thread.sleep(1000);
    15         } catch (InterruptedException e) {
    16             e.printStackTrace();
    17         }
    18         //准备完毕:打印准备完毕的信息
    19         System.out.println(name+"准备完毕");
    20         countDownLatch.countDown();
    21     }
    22 
    23     /**
    24      *教练方法
    25      */
    26     public void coach(){
    27         //获取教练线程的名称
    28         String name=Thread.currentThread().getName();
    29         //教练等待运动员准备完毕
    30         System.out.println(name+"等待运动员准备");
    31         //等待中。。。。
    32         try {
    33             countDownLatch.await();
    34         } catch (InterruptedException e) {
    35             e.printStackTrace();
    36         }
    37         //运动员都准备好了,教练开始训练
    38         System.out.println("所有运动员都准备完毕"+name+"开始训练");
    39     }
    40     public static void main(String[] args) {
    41         //创建CoachRacerDemo实例
    42         CoachRacerDemo coachRacerDemo=new CoachRacerDemo();
    43         //创建三个运动员线程
    44         Thread thread1=new Thread(new Runnable() {
    45             @Override
    46             public void run() {
    47                 coachRacerDemo.racer();
    48             }
    49         },"运动员1");
    50         Thread thread2=new Thread(new Runnable() {
    51             @Override
    52             public void run() {
    53                 coachRacerDemo.racer();
    54             }
    55         },"运动员2");
    56         Thread thread3=new Thread(new Runnable() {
    57             @Override
    58             public void run() {
    59                 coachRacerDemo.racer();
    60             }
    61         },"运动员3");
    62         //创建一个教练线程
    63         Thread thread4=new Thread(new Runnable() {
    64             @Override
    65             public void run() {
    66                 coachRacerDemo.coach();
    67             }
    68         },"教练");
    69         thread4.start();
    70         thread1.start();
    71         thread2.start();
    72         thread3.start();
    73 
    74     }
    75 }

      运行结果:

    五,线程通信的方法--CyclicBarrier方法

    • CyclicBarrier是在Java1.5被引入的,存在于java.util.concurrent包下。
    • CyclicBarrier实现让一组线程等待至某种状态之后在全部同时执行。
    • CyclicBarrier底层是基于ReentrantLock和Condition实现的。

     【代码演示】:类似于运动员比赛,当所有运动员都准备好之后,在枪声一响,所有运动员立马同时起跑。

     1 import java.util.Date;
     2 import java.util.concurrent.BrokenBarrierException;
     3 import java.util.concurrent.CyclicBarrier;
     4 
     5 public class ThreadRunnerDemo {
     6     private CyclicBarrier cyclicBarrier=new CyclicBarrier(4);//设置线程数量
     7     public void StartRun(){
     8         //获取线程的信息
     9         String name=Thread.currentThread().getName();
    10         //调用CyclicBarrier的await方法开始准备
    11         try {
    12             cyclicBarrier.await();
    13         } catch (InterruptedException e) {
    14             e.printStackTrace();
    15         } catch (BrokenBarrierException e) {
    16             e.printStackTrace();
    17         }
    18         System.out.println(name+"已经准备好"+new Date().getTime());
    19     }
    20     public static void main(String[] args) {
    21         final ThreadRunnerDemo threadRunnerDemo=new ThreadRunnerDemo();
    22         Thread thread1=new Thread(new Runnable() {
    23             @Override
    24             public void run() {
    25                 threadRunnerDemo.StartRun();
    26             }
    27         },"运动员1号");
    28         Thread thread2=new Thread(new Runnable() {
    29             @Override
    30             public void run() {
    31                 threadRunnerDemo.StartRun();
    32             }
    33         },"运动员2号");
    34         Thread thread3=new Thread(new Runnable() {
    35             @Override
    36             public void run() {
    37                 threadRunnerDemo.StartRun();
    38             }
    39         },"运动员3号");
    40         Thread thread4=new Thread(new Runnable() {
    41             @Override
    42             public void run() {
    43                 threadRunnerDemo.StartRun();
    44             }
    45         },"运动员4号");
    46         thread1.start();
    47         thread2.start();
    48         thread3.start();
    49         thread4.start();
    50     }
    51 }

      运行结果: 

     

       根据后面的时间,我们可以确定四个线程是同时启动的。

     六,线程间通信的方法--Semaphore方法

    •  Semaphore是在Java1.5被引入的,存在于java.util.concurrent包下。
    •  Semaphore用于控制对某组资源的访问权限。

    【代码演示】:8个工人使用3台机器工作,机器为互斥资源(即每次只能一个人使用)。

     1 import java.util.concurrent.Semaphore;
     2 public class WorkerDemo {
     3     static class Work implements Runnable{
     4         private int workId=0;//工人的工号
     5         private Semaphore machineNumber;//机器数量
     6         public Work(int workId,Semaphore machineNumber){
     7             this.workId=workId;
     8             this.machineNumber=machineNumber;
     9         }
    10         @Override
    11         public void run() {
    12             //工人要去获取机器
    13             try {
    14                 machineNumber.acquire();
    15             } catch (InterruptedException e) {
    16                 e.printStackTrace();
    17             }
    18             //打印当前工作的工人的信息
    19             String name=Thread.currentThread().getName();
    20             System.out.println(name+"获取机器,开始工作了。。。");
    21             //工作过程
    22             try {
    23                 Thread.sleep(1000);
    24             } catch (InterruptedException e) {
    25                 e.printStackTrace();
    26             }
    27             //工作完释放机器
    28             machineNumber.release();
    29             System.out.println(name+"工作完了,释放机器");
    30         }
    31     }
    32     public static void main(String[] args) {
    33         int workers=8;//工人数
    34         Semaphore semaphore=new Semaphore(3);//机器数
    35         for (int i = 0; i < workers; i++) {
    36             new Thread(new Work(i,semaphore)).start();
    37         }
    38     }
    39 }

    运行结果:

    面试题:wait和sleep的区别?

    面试题:wait和notify的异同?

    相同点:

    • 都Object中的方法。
    • 调用必须在同步的前提下。(因为该方法是要用锁对象调用,而只有在同步的情况下才有锁,也就是notify方法调用必须放在synchronized方法或synchronized块中)。

    不同点:

    wait():

    • wait()方法的调用使得当前线程必须要等待,直到另外一个线程调用notify()或者notifyAll()方法(注意必须调用notify/notifyAll方法才能唤醒)。
    • wait()方法的调用会导致锁的释放。

    notify():

    • notify()方法的调用可以唤醒当前锁对象下等待的另一个单个线程。优先级较高的优先唤醒,如果优先级一样,则随机唤醒。notifyAll()方法的调用可以唤醒当前锁对象下等待的所有线程。
  • 相关阅读:
    11G-使用跨平台增量备份减少可移动表空间的停机时间 XTTS (Doc ID 1389592.1)
    如何在asm上定位数据块
    log file switch (checkpoint incomplete)
    踩坑记录(git)-误提交target文件夹删除办法
    踩坑记录(java)-双层增强for调用remove(obj)报错 java.util.ConcurrentModificationException(并发修改异常)
    Zookeeper服务端的创建及简单的客户端创建节点
    SpringAOP(注解方式实现面向切面编程)之常用Before、After、Around
    EasyExcel示例(阿里巴巴)基于Maven
    Redis简单命令(部分示例代码)
    sqlserver日志处理不当而造成的隐患
  • 原文地址:https://www.cnblogs.com/ljl150/p/12389548.html
Copyright © 2011-2022 走看看