zoukankan      html  css  js  c++  java
  • 线程通信之生产者消费者模型

      线程通信,是指线程之间的消息传递。

      多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。

      另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。

      生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模型来进一步认识线程通信。在此,我们先对若干概念进行了解。  

      生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费

      消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费

      线程通信:使用java中超类Object中提供的一些方法:

    1 public final void wait();  //注:long timeout=0  表示线程一直等待,直到其它线程通知
    2 public final native void wait(long timeout);   //线程等待指定毫秒参数的时间,超过该时间则不再等待
    3 public final void wait(long timeout, int nanos);  /*线程等待指定毫秒、微妙的时间,timeout最大等待时间,
                                    以毫秒为单位,nanos额外的时间,在纳秒范围0-999999*/
    4 public final native void notify(); //唤醒一个处于等待状态的线程 5 public final native void notifyAll(); //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行

      需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。

      接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。  

      1 /**
      2  * 1.共享资源缓存和操作类
      3  */
      4 public class SharedCache {
      5     //产品,此处使用char字符,作为存储共享数据的数据类型
      6     private char cache;
      7     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
      8     private boolean flag=false;
      9     /*
     10     生产操作(生产者):向仓库中添加共享数据
     11      */
     12     public synchronized void addSharedCacheData(char data){
     13         //产品未消费,则生产者的生产操作等待
     14         if(flag){
     15             System.out.println("产品未消费,生产者的生产操作等待");
     16             try {
     17                 //生产者等待
     18                 wait();
     19             } catch (InterruptedException e) {
     20                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     21             }
     22         }
     23         //产品已消费,则生产者继续生产
     24         this.cache=data;
     25         //标记已生产
     26         flag=true;
     27         //通知消费者已生产
     28         notify();
     29         System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费");
     30     }
     31     /*
     32     消费操作(消费者):向仓库中获取共享数据
     33      */
     34     public synchronized char getSharedCacheData(){
     35         //如果产品未生产,则消费者等待
     36         if(!flag){
     37             System.out.println("产品未生产,消费者的消费操作等待");
     38             try {
     39                 wait();
     40             } catch (InterruptedException e) {
     41                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     42             }
     43         }
     44         //标记已消费
     45         flag=false;
     46         //通知生产者已消费
     47         notify();
     48         System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产");
     49         return this.cache;
     50     }
     51 }
     52 /**
     53  * 2.生产者线程类
     54  */
     55 public class Producer extends Thread{
     56     //共享缓存资源类的对象
     57     private SharedCache cache;
     58     //构造器,传入共享资源类的对象
     59     public Producer(SharedCache cache){
     60         this.cache=cache;
     61     }
     62     /*
     63     生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里)
     64     生产A-D类型的产品
     65      */
     66     @Override
     67     public void run() {
     68         for(char product='A';product<='D';product++){
     69             try {
     70                 sleep((int)(Math.random()*3000));
     71             } catch (InterruptedException e) {
     72                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     73             }
     74             //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里)
     75             cache.addSharedCacheData(product);
     76         }
     77     }
     78 }
     79 /**
     80  * 3.消费者线程类
     81  */
     82 public class Consumer extends Thread{
     83     //共享缓存资源类的对象
     84     private SharedCache cache;
     85     //构造器,传入共享资源类的对象
     86     public Consumer(SharedCache cache){
     87         this.cache=cache;
     88     }
     89     /*
     90     消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品)
     91     当消费到D类型的产品时即停止消费
     92      */
     93     @Override
     94     public void run() {
     95         char product='A';
     96         do{
     97             try {
     98                 Thread.sleep((int)(Math.random()*3000));
     99             } catch (InterruptedException e) {
    100                 System.out.println("Thread interrupted Exception:"+e.getMessage());
    101             }
    102             //消费,从仓库取走商品
    103             product=cache.getSharedCacheData();
    104         }while (product!='D');
    105     }
    106 }
    107 /**
    108  * 4.线程通信测试类
    109  */
    110 public class Test {
    111     public static void main(String[] args) {
    112         //生产者与消费者共享同一个资源
    113         SharedCache cache = new SharedCache();
    114         //启动消费者线程
    115         new Consumer(cache).start();
    116         //启动生产者线程
    117         new Producer(cache).start();
    118     }
    119 }

      运行上述的测试类后,执行结果如下:

    产品未生产,消费者的消费操作等待
    生产者--->产品:A已生产,等待消费者消费
    消费者--->产品:A已消费,通知生产者生产
    生产者--->产品:B已生产,等待消费者消费
    消费者--->产品:B已消费,通知生产者生产
    生产者--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者--->产品:C已消费,通知生产者生产
    生产者--->产品:D已生产,等待消费者消费
    消费者--->产品:D已消费,通知生产者生产

      我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显式的Lock操作,可以实现唤醒、冻结指定的线程。

      接口Lock的实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。

      接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。

      需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。  

      Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

      其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

      我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:  

      1 import java.util.concurrent.locks.Condition;
      2 import java.util.concurrent.locks.Lock;
      3 import java.util.concurrent.locks.ReentrantLock;
      4 /**
      5  * 共享的资源
      6  */
      7 public class Resource {
      8     private char product;
      9     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
     10     private boolean flag = false;
     11     //定义一个实现Lock接口的ReentrantLock类对象
     12     private Lock lock = new ReentrantLock();
     13     /*
     14     Condition是被绑定到Lock上的,
     15     要创建一个Lock的Condition,
     16     必须用Lock对象的newCondition()方法
     17      */
     18     private Condition cond_pro = lock.newCondition();
     19     //一个lock可以有多个相关的condition
     20     private Condition cond_con = lock.newCondition();
     21     /*
     22         定义生产方法
     23      */
     24     public void produce(char product) throws InterruptedException {
     25         lock.lock();//手动加同步锁
     26         try {
     27             while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产
     28                 System.out.println("产品未消费,生产者的生产操作等待");
     29                 cond_pro.await();
     30             }
     31             this.product = product;
     32             //标记已生产
     33             flag = true;
     34             //通知消费者已生产
     35             cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象
     36             System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费");
     37         } finally {
     38             lock.unlock();//释放锁
     39         }
     40     }
     41     /*
     42         定义消费方法
     43      */
     44     public char consume() throws InterruptedException {
     45         lock.lock();
     46         try {
     47             while (!flag) {
     48                 System.out.println("产品未生产,消费者的消费操作等待");
     49                 cond_con.await();
     50             }
     51             //标记已消费
     52             flag = false;
     53             //通知生产者已消费
     54             cond_pro.signal();
     55             System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产");
     56             return this.product;
     57         } finally {
     58             lock.unlock();
     59         }
     60     }
     61 }
     62 /**
     63  * 生产者
     64  */
     65 public class Producer implements Runnable{
     66     private Resource res;
     67     public Producer(Resource res){
     68         this.res=res;
     69     }
     70     @Override
     71     public void run() {
     72         for(char product='A';product<='D';product++){
     73             try {
     74                 res.produce(product);
     75             } catch (InterruptedException e) {
     76                 e.printStackTrace();
     77             }
     78         }
     79     }
     80 }
     81 /**
     82  * 消费者
     83  */
     84 public class Consumer implements Runnable{
     85     private Resource res;
     86     public Consumer(Resource res){
     87         this.res=res;
     88     }
     89     @Override
     90     public void run() {
     91         char product='A';
     92         do{
     93             try {
     94                 product=res.consume();
     95             } catch (InterruptedException e) {
     96                 e.printStackTrace();
     97             }
     98         }while(product!='D');
     99     }
    100 }
    101 /**
    102  * 用ReentrantLock和Condition实现生产者消费者模型
    103  */
    104 public class Test {
    105     //入口方法
    106     public static void main(String[] args) {
    107         Resource res = new Resource();//生产者与消费者共享的资源
    108         Producer producer = new Producer(res);//生产者
    109         Consumer consumer = new Consumer(res);//消费者
    110         //生产者线程与消费者线程各创建两个
    111         Thread p1 = new Thread(producer);
    112         Thread p2 = new Thread(producer);
    113         Thread c1 = new Thread(consumer);
    114         Thread c2 = new Thread(consumer);
    115         p1.start();
    116         p2.start();
    117         c1.start();
    118         c2.start();
    119     }
    120 }

      上述代码执行结果如下:

    生产者Thread-0--->产品:A已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:A已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:A已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:A已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-0--->产品:B已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-3--->产品:B已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:B已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:B已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-0--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-3--->产品:C已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:C已消费,通知生产者生产
    生产者Thread-0--->产品:D已生产,等待消费者消费
    消费者Thread-3--->产品:D已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:D已生产,等待消费者消费
    消费者Thread-3--->产品:D已消费,通知生产者生产  
  • 相关阅读:
    TX2 刷机教程
    ROS2 树莓派SBC镜像安装
    OP3 默认ID图
    OP3 镜像恢复
    ROS2 BringUp
    学习笔记3:Linux面试题
    学习笔记2:Linux简单指令
    学习笔记1:Git简单指令
    编程小白入门分享五:Vue的自定义组件
    编程小白入门分享四:Vue的安装及使用快速入门
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8378778.html
Copyright © 2011-2022 走看看