zoukankan      html  css  js  c++  java
  • Java的设计模式(7)— 生产者-消费者模式

      生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。这个模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓存区进行通信,这样就避免了生产者和消费者直接通信,从而将生产者和消费者解耦。不管是生产高于消费,还是消费高于生产,缓存区的存在可以确保系统的正常运行。这个模式有以下几种角色:

    • 生产者:用于提交用户的请求,提取用户任务,装入内存缓冲区。
    • 消费者:在内存缓冲区中提取并处理任务。
    • 内存缓冲区:缓存生产者提交的任务或数据,供消费者使用。
    • 任务:生产者向内存缓冲区提交的数据结构。
    • Main:使用消费者和生产者的客户端。

      其中BlockingQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象)。PCData表示一个生产任务,或者相关任务的数据,生产者对象和消费者对象均引用一个BlockingQueue实例。生产者负责创建PCData对象,并将它加入队列中,消费者从这个队列中获取PCData对象。下面举个例子:

      首先生产者线程实现如下,它构建PCData对象,并放入BlockingQueue队列中

    public class Producer implements Runnable {
    
        private volatile boolean isRunning = true;
        private BlockingQueue<PCData> queue;                        // 内存缓存区
        private static AtomicInteger count = new AtomicInteger();   // 总数,原子操作
        private static final int SLEEPTIME = 1000;
        
        public Producer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            PCData data = null;
            Random r = new Random();
            System.out.println("start producer id = "+Thread.currentThread().getId());
            try {
                while(isRunning){
                    Thread.sleep(SLEEPTIME);
                    data = new PCData(count.incrementAndGet());    // 构造任务数据
                    System.out.println(data+" is put into queue");
                    if(!queue.offer(data,2,TimeUnit.SECONDS)){     // 提交到数据缓存区中
                        System.out.println("failed to put data:"+data);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
    
        }
    
        public void stop(){
            isRunning = false;
        }
    }

      对应的消费者实现如下,它从BlockingQueue队列中取出PCData对象,并进行相应的计算。

    public class Consumer implements Runnable {
    
        private BlockingQueue<PCData> queue; 
        private static final int SLEEPTIME = 1000;
        
        public Consumer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            System.out.println("start Consumer id = "+Thread.currentThread().getId());
            Random r = new Random();
            try {
            while(true){
                PCData data  =this.queue.take();
                if(null!=data){
                    int re = data.getData() * data.getData();
                    System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

      PCData对象作为生产者和消费者之间的共享数据模型,定义如下:

    public class PCData {
    
        private final int intData;
        public PCData(int d){
            intData = d;
        }
        public PCData(String d){
            intData = Integer.valueOf(d);
        }
        public int getData(){
            return intData;
        }
        @Override
        public String toString() {
            return "intData:" + intData;
        }
        
    }

      在主函数中,创建三个生产者和三个消费者,并让他们协作运行,在主函数实现中,定义LinkedBlockingQueue作为BlockingQueue队列的实现类。

    public class Main {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>();
            Producer p1 = new Producer(queue);
            Producer p2 = new Producer(queue);
            Producer p3 = new Producer(queue);
            Consumer c1 = new Consumer(queue);
            Consumer c2 = new Consumer(queue);
            Consumer c3 = new Consumer(queue);
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(p1);
            service.execute(p2);
            service.execute(p3);
            service.execute(c1);
            service.execute(c2);
            service.execute(c3);
            Thread.sleep(10000);
            p1.stop();
            p2.stop();
            p3.stop();
            Thread.sleep(3000);
            service.shutdown();
        }
    }

      生产者-消费者模式很好地对生产者线程和消费者线程进行解耦,优化了系统整体结构。同时,由于缓冲作用,允许生产者和消费者线程存在执行上的性能差异,从一定程度上解决了性能瓶颈对系统性能的影响。

  • 相关阅读:
    web在线智能四则运算挑战赛
    超简单的实现wordcount
    构建之法现代软件工程自我介绍
    通过WMI获取机器信息
    ManagementObjectSearcher Path
    开启FIPS协议
    Windows Server 2012开启多人远程
    开发企业应用系统需要掌握的知识技能
    Win7系统下彻底删除无用服务的方法
    C#基础(二)之数据类型
  • 原文地址:https://www.cnblogs.com/wangyongwen/p/11332436.html
Copyright © 2011-2022 走看看