zoukankan      html  css  js  c++  java
  • 线程通信之生产者消费者模型

      线程通信,是指线程之间的消息传递。

      多个线程在操作同一个资源时,它们对共享资源的操作动作可能不同;它们共享同一个资源,互为条件,相互依赖,相互通信,从而让任务向前推进。

      另外,在线程的同步策略中,虽然可以解决并发更新同一个资源,保障资源的安全,但不能用来实现线程间的消息传递。因此,线程通信与线程同步往往会融合使用。

      生产者消费者模型堪称是线程通信中的一个典型案例,我们接下来通过生产者消费者模型来进一步认识线程通信。在此,我们先对若干概念进行了解。  

      生产者:没有生产之前通知消费者等待,生产产品结束之后,马上通知消费者消费

      消费者:没有消费之前通知生产者等待,消费产品结束之后,通知生产者继续生产产品以供消费

      线程通信:使用java中超类Object中提供的一些方法:

    1 public final void wait();  //注:long timeout=0  表示线程一直等待,直到其它线程通知
    2 public final native void wait(long timeout);   //线程等待指定毫秒参数的时间,超过该时间则不再等待
    3 public final void wait(long timeout, int nanos);  /*线程等待指定毫秒、微妙的时间,timeout最大等待时间,
                                    以毫秒为单位,nanos额外的时间,在纳秒范围0-999999*/
    4 public final native void notify(); //唤醒一个处于等待状态的线程 5 public final native void notifyAll(); //唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先运行

      需要注意的是,上述方法只能在同步方法或者同步代码块中使用,否则会抛出异常。

      接下来,我们以生产A-D个产品,放入仓库,待消费者消费后,生产者再进行生产为例,看下生产者消费者模式的运行流程。  

      1 /**
      2  * 1.共享资源缓存和操作类
      3  */
      4 public class SharedCache {
      5     //产品,此处使用char字符,作为存储共享数据的数据类型
      6     private char cache;
      7     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
      8     private boolean flag=false;
      9     /*
     10     生产操作(生产者):向仓库中添加共享数据
     11      */
     12     public synchronized void addSharedCacheData(char data){
     13         //产品未消费,则生产者的生产操作等待
     14         if(flag){
     15             System.out.println("产品未消费,生产者的生产操作等待");
     16             try {
     17                 //生产者等待
     18                 wait();
     19             } catch (InterruptedException e) {
     20                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     21             }
     22         }
     23         //产品已消费,则生产者继续生产
     24         this.cache=data;
     25         //标记已生产
     26         flag=true;
     27         //通知消费者已生产
     28         notify();
     29         System.out.println("生产者--->产品:"+data+"已生产,等待消费者消费");
     30     }
     31     /*
     32     消费操作(消费者):向仓库中获取共享数据
     33      */
     34     public synchronized char getSharedCacheData(){
     35         //如果产品未生产,则消费者等待
     36         if(!flag){
     37             System.out.println("产品未生产,消费者的消费操作等待");
     38             try {
     39                 wait();
     40             } catch (InterruptedException e) {
     41                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     42             }
     43         }
     44         //标记已消费
     45         flag=false;
     46         //通知生产者已消费
     47         notify();
     48         System.out.println("消费者--->产品:"+this.cache+"已消费,通知生产者生产");
     49         return this.cache;
     50     }
     51 }
     52 /**
     53  * 2.生产者线程类
     54  */
     55 public class Producer extends Thread{
     56     //共享缓存资源类的对象
     57     private SharedCache cache;
     58     //构造器,传入共享资源类的对象
     59     public Producer(SharedCache cache){
     60         this.cache=cache;
     61     }
     62     /*
     63     生产者生产产品,放入共享资源缓存类(相当于将生产的产品放入仓库里)
     64     生产A-D类型的产品
     65      */
     66     @Override
     67     public void run() {
     68         for(char product='A';product<='D';product++){
     69             try {
     70                 sleep((int)(Math.random()*3000));
     71             } catch (InterruptedException e) {
     72                 System.out.println("Thread interrupted Exception:"+e.getMessage());
     73             }
     74             //生产产品,放入共享缓存数据类的对象里(相当于把生产的产品放到仓库里)
     75             cache.addSharedCacheData(product);
     76         }
     77     }
     78 }
     79 /**
     80  * 3.消费者线程类
     81  */
     82 public class Consumer extends Thread{
     83     //共享缓存资源类的对象
     84     private SharedCache cache;
     85     //构造器,传入共享资源类的对象
     86     public Consumer(SharedCache cache){
     87         this.cache=cache;
     88     }
     89     /*
     90     消费者消费产品,获取共享缓存类的对象里的数据(相当于从仓库里提取产品)
     91     当消费到D类型的产品时即停止消费
     92      */
     93     @Override
     94     public void run() {
     95         char product='A';
     96         do{
     97             try {
     98                 Thread.sleep((int)(Math.random()*3000));
     99             } catch (InterruptedException e) {
    100                 System.out.println("Thread interrupted Exception:"+e.getMessage());
    101             }
    102             //消费,从仓库取走商品
    103             product=cache.getSharedCacheData();
    104         }while (product!='D');
    105     }
    106 }
    107 /**
    108  * 4.线程通信测试类
    109  */
    110 public class Test {
    111     public static void main(String[] args) {
    112         //生产者与消费者共享同一个资源
    113         SharedCache cache = new SharedCache();
    114         //启动消费者线程
    115         new Consumer(cache).start();
    116         //启动生产者线程
    117         new Producer(cache).start();
    118     }
    119 }

      运行上述的测试类后,执行结果如下:

    产品未生产,消费者的消费操作等待
    生产者--->产品:A已生产,等待消费者消费
    消费者--->产品:A已消费,通知生产者生产
    生产者--->产品:B已生产,等待消费者消费
    消费者--->产品:B已消费,通知生产者生产
    生产者--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者--->产品:C已消费,通知生产者生产
    生产者--->产品:D已生产,等待消费者消费
    消费者--->产品:D已消费,通知生产者生产

      我们在上面完成的生产者消费者模型,在处理线程同步问题时,主要是用了synchronized同步方法,JDK 1.5提供了多线程升级方案,将同步synchronized替换成了显式的Lock操作,可以实现唤醒、冻结指定的线程。

      接口Lock的实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。Lock 可以支持多个相关的 Condition 对象,从而在使用中更加灵活。

      接口Condition可以替代传统的线程间通信,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。该对象可以通过Lock锁进行获取。可以说,传统线程的通信方式,Condition都可以实现。

      需要注意的是,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。  

      Java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,从而为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

      其中,ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,还添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。

      我们接下来通过ReentrantLock 类和Condition接口的实现类来完成一个生产者消费者模型。为此,我们需要创建一个ReentrantLock类的多态对象,即建立一把锁,然后将这把锁与两个Condition对象关联。我们接下来就用Lock与Condition实现一个生产者消费者模型,实现与上述例子相似的效果,代码具体如下:  

      1 import java.util.concurrent.locks.Condition;
      2 import java.util.concurrent.locks.Lock;
      3 import java.util.concurrent.locks.ReentrantLock;
      4 /**
      5  * 共享的资源
      6  */
      7 public class Resource {
      8     private char product;
      9     //产品消费标识,是线程间通信的信号,为true表示未消费(生产),false表示未生产(消费)
     10     private boolean flag = false;
     11     //定义一个实现Lock接口的ReentrantLock类对象
     12     private Lock lock = new ReentrantLock();
     13     /*
     14     Condition是被绑定到Lock上的,
     15     要创建一个Lock的Condition,
     16     必须用Lock对象的newCondition()方法
     17      */
     18     private Condition cond_pro = lock.newCondition();
     19     //一个lock可以有多个相关的condition
     20     private Condition cond_con = lock.newCondition();
     21     /*
     22         定义生产方法
     23      */
     24     public void produce(char product) throws InterruptedException {
     25         lock.lock();//手动加同步锁
     26         try {
     27             while (flag) {//此时若生产完一个以后唤醒了另一个生产者,则再次判断,避免两个生产者同时生产
     28                 System.out.println("产品未消费,生产者的生产操作等待");
     29                 cond_pro.await();
     30             }
     31             this.product = product;
     32             //标记已生产
     33             flag = true;
     34             //通知消费者已生产
     35             cond_con.signal();//唤醒消费方法,利用了condition的signal()指定唤醒对象
     36             System.out.println("生产者"+Thread.currentThread().getName()+"--->产品:"+product+"已生产,等待消费者消费");
     37         } finally {
     38             lock.unlock();//释放锁
     39         }
     40     }
     41     /*
     42         定义消费方法
     43      */
     44     public char consume() throws InterruptedException {
     45         lock.lock();
     46         try {
     47             while (!flag) {
     48                 System.out.println("产品未生产,消费者的消费操作等待");
     49                 cond_con.await();
     50             }
     51             //标记已消费
     52             flag = false;
     53             //通知生产者已消费
     54             cond_pro.signal();
     55             System.out.println("消费者"+Thread.currentThread().getName()+"--->产品:"+this.product+"已消费,通知生产者生产");
     56             return this.product;
     57         } finally {
     58             lock.unlock();
     59         }
     60     }
     61 }
     62 /**
     63  * 生产者
     64  */
     65 public class Producer implements Runnable{
     66     private Resource res;
     67     public Producer(Resource res){
     68         this.res=res;
     69     }
     70     @Override
     71     public void run() {
     72         for(char product='A';product<='D';product++){
     73             try {
     74                 res.produce(product);
     75             } catch (InterruptedException e) {
     76                 e.printStackTrace();
     77             }
     78         }
     79     }
     80 }
     81 /**
     82  * 消费者
     83  */
     84 public class Consumer implements Runnable{
     85     private Resource res;
     86     public Consumer(Resource res){
     87         this.res=res;
     88     }
     89     @Override
     90     public void run() {
     91         char product='A';
     92         do{
     93             try {
     94                 product=res.consume();
     95             } catch (InterruptedException e) {
     96                 e.printStackTrace();
     97             }
     98         }while(product!='D');
     99     }
    100 }
    101 /**
    102  * 用ReentrantLock和Condition实现生产者消费者模型
    103  */
    104 public class Test {
    105     //入口方法
    106     public static void main(String[] args) {
    107         Resource res = new Resource();//生产者与消费者共享的资源
    108         Producer producer = new Producer(res);//生产者
    109         Consumer consumer = new Consumer(res);//消费者
    110         //生产者线程与消费者线程各创建两个
    111         Thread p1 = new Thread(producer);
    112         Thread p2 = new Thread(producer);
    113         Thread c1 = new Thread(consumer);
    114         Thread c2 = new Thread(consumer);
    115         p1.start();
    116         p2.start();
    117         c1.start();
    118         c2.start();
    119     }
    120 }

      上述代码执行结果如下:

    生产者Thread-0--->产品:A已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:A已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:A已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:A已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-0--->产品:B已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-3--->产品:B已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:B已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:B已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-0--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-3--->产品:C已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:C已生产,等待消费者消费
    产品未消费,生产者的生产操作等待
    消费者Thread-2--->产品:C已消费,通知生产者生产
    生产者Thread-0--->产品:D已生产,等待消费者消费
    消费者Thread-3--->产品:D已消费,通知生产者生产
    产品未生产,消费者的消费操作等待
    生产者Thread-1--->产品:D已生产,等待消费者消费
    消费者Thread-3--->产品:D已消费,通知生产者生产  
  • 相关阅读:
    Dynamics AX 2012 R2 配置E-Mail模板
    Dynamics AX 2012 R2 设置E-Mail
    Dynamics AX 2012 R2 为运行失败的批处理任务设置预警
    Dynamics AX 2012 R2 耗尽用户
    Dynamics AX 2012 R2 创建一个专用的批处理服务器
    Dynamics AX 2012 R2 创建一个带有负载均衡的服务器集群
    Dynamics AX 2012 R2 安装额外的AOS
    Dynamics AX 2012 R2 将系统用户账号连接到工作人员记录
    Dynamics AX 2012 R2 从代码中调用SSRS Report
    Dynamics AX 2012 R2 IIS WebSite Unauthorized 401
  • 原文地址:https://www.cnblogs.com/lizhangyong/p/8378778.html
Copyright © 2011-2022 走看看