zoukankan      html  css  js  c++  java
  • 生产者-消费者模式

    可以用wait()和notify()来实现,也可以用阻塞队列,下面介绍阻塞队列:

    阻塞队列:

    BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put方法将阻塞直到空间可用;如果队列为空,那么take方法将阻塞直到有元素可用。

      队列可以是有界的也可以是无界的。

      阻塞队列提供了一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。
      在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抵制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

     

    BlockingQueue的多种实现: 
      LinkedBlockingQueueArrayBlockingQueue:是FIFO,二者分别与LinkedList和ArrayList类似,但比同步List拥有更好的并发性能。 

      PriorityBlockingQueue:是一个按优先级排序的队列,当你希望按照某种顺序而不是FIFO来处理元素时,这个队列将非常有用。

      SynchronousQueue:事实上它并不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待这把元素加入或移出队列。 
    如果以洗盘子为比喻,就相当于没有盘架来暂时存放洗好的盘子,而是将洗好的盘子直接放入下一个空闲的烘干机中。

    生产者:

     1 import java.util.concurrent.BlockingQueue;
     2 
     3 
     4 public class Producer implements Runnable{
     5     
     6     BlockingQueue<String> queue;
     7 
     8     Producer(BlockingQueue<String> queue) {
     9         this.queue = queue;
    10     }
    11 
    12     @Override
    13     public void run() {
    14         try {
    15             String temp = Thread.currentThread().getName() ;
    16             queue.put(temp);
    17             System.out.println("一个产品由   " + temp + "  生产");
    18             //Thread.sleep(1000) ;
    19         } catch (InterruptedException e) {
    20             e.printStackTrace();
    21         }
    22         
    23     }
    24 
    25 }

    消费者:

     1 import java.util.concurrent.BlockingQueue;
     2 
     3 
     4 public class Consumer implements Runnable{
     5 
     6     BlockingQueue<String> queue;
     7 
     8     Consumer(BlockingQueue<String> queue) {
     9         this.queue = queue;
    10     }
    11     
    12     @Override
    13     public void run() {
    14         try {
    15             String temp = queue.take() ;
    16             System.out.println(temp + " 生产的一个产品被   " + Thread.currentThread().getName() + "  消费");
    17             Thread.sleep(1000) ;
    18         } catch (InterruptedException e) {
    19             e.printStackTrace();
    20         }
    21         
    22     }
    23 
    24 }

    测试:

     1 import java.util.concurrent.BlockingQueue;
     2 import java.util.concurrent.LinkedBlockingQueue;
     3 
     4 
     5 public class ProducerConsumerExample {
     6     
     7 
     8     public static void main(String[] args) {
     9         BlockingQueue<String> queue = new LinkedBlockingQueue<>(2) ;
    10         
    11         Producer p = new Producer(queue) ;
    12         Consumer c = new Consumer(queue) ;
    13         
    14         for(int i = 1 ; i <= 5 ; i++) {
    15             new Thread(p,"Producer"+i).start();
    16             
    17             new Thread(c,"Consumer"+i).start();
    18         }
    19     }
    20 
    21 }

    结果:

    阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

    实际应用中的例子:微信消息处理队列

     1 public class WechatMsgSendJob {
     2     private static Log log = LogFactory.getLog(WechatMsgSendJob.class);
     3     private static  BlockingQueue<WechatMsg> msgs = new LinkedBlockingQueue<WechatMsg>(10000);
     4    
     5     public static void addMsg(WechatMsg msg) {
     6         try {
     7             msgs.put(msg);
     8         } catch (InterruptedException e) {
     9             log.error("addMsg error", e);
    10         }
    11     }
    12     //init方法
    13     public void init(){
    14         try {
    15 //启动线程池,8个线程 不停的处理队列中的数据
    16 ExecutorService consumerService = Executors.newFixedThreadPool(8);
    17         for (int i = 0; i < 8; i++) {
    18             consumerService.submit(new Runnable() {
    19             public void run() {
    20                 while (true) {
    21                     try {
    22                         WechatMsg msg = msgs.take();
    23                         if (msg != null) {
    24                         //这里处理数据
    25                                         
    26                                         
    27                         }
    28                         } catch (Exception e) {
    29                             log.error("callWechatSendApi error", e);
    30                         }
    31 
    32                 }
    33             }
    34             });
    35             }
    36             }catch (Exception e) {
    37             log.error("sendMsg error", e);
    38         }
    39     }
    40 }
  • 相关阅读:
    C++ Boost 函数与回调应用
    C++ Boost库 操作字符串与正则
    C++ Boost库 实现命令行解析
    PHP 开发与代码审计(总结)
    c strncpy函数代码实现
    c strcat函数代码实现
    c strcpy函数代码实现
    c strlen函数代码实现
    Java-IO流-打印流
    Java-IO流-文件复制2
  • 原文地址:https://www.cnblogs.com/mengchunchen/p/9174219.html
Copyright © 2011-2022 走看看