zoukankan      html  css  js  c++  java
  • 一个生产者消费者模式的问题

    问题是这样的:

    有1个生产者生产 产品, 60个消费者消费产品, 产品可以被同时消费,而且需要等到每个消费者都消费一遍后, 才能够把它删除。

    问题其实也不难, 只是稍微有点技巧。实现方式其实很多种, 下面的应该是最简单的吧

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CyclicBarrier;
    
    /**
     * Created by l.k on 2018/11/29.
     *
     * 1个线程生产 产品, 60个线程消费, 产品可以被同时消费
     *
     */
    public class TestCyclicBarrier {
        public static void main(String[] args) {
            //List list = new ArrayList();// 不能使用List, 必须要一个 同步阻塞的队列, 因为有多个线程操作它
            ArrayBlockingQueue list = new ArrayBlockingQueue(200);
    
            int parties = 60; // 总共的消费者数目
            CyclicBarrier barrier = new CyclicBarrier(parties, new Runnable() {
                @Override
                public void run() {
                    try {
                        //Object remove = list.remove(0);
                        Object remove = list.take(); //  所有的 消费者 都消费完了一遍之后, 这个产品才能够 删除 !!
                        System.out.println("所有的消费者 都消费完一遍了  " + remove);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            // 先启动生产者
            Producer p = new Producer(list);
            Thread thread1 = new Thread(p);
            thread1.start(); // 开始生产
    
            // 再启动消费者
            for (int i = 0; i < parties; i++) {
                Consumer consumer = new Consumer(barrier);
                Thread thread2 = new Thread(consumer);
                thread2.start(); // 开始 消费
            }
    
        }
    }
    
    class Consumer implements Runnable{
        private final CyclicBarrier barriar;
        //private final ArrayBlockingQueue list;// 不需要这个参数了
    
        public Consumer(CyclicBarrier barriar) {
            this.barriar = barriar;
            //this.list = list;
        }
    
        @Override
        public void run() {
            boolean limited = false;
            int cnt = 1;
            while (cnt <= 100) {
                consume();
                cnt ++;
            }
        }
    
        public long consume() {
            long start = System.currentTimeMillis();
            int numberWaiting = 0;
            try {
                boolean broken = barriar.isBroken();
                numberWaiting = barriar.getNumberWaiting();
                System.out.println("准备消费 = " + "  at : " + numberWaiting + " " + broken);
                barriar.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            long l = System.currentTimeMillis() - start;
            System.out.println(String.format("等待了 %s 毫秒 ", l));
            return l;
        }
    }
    
    class Producer implements Runnable{
        private final ArrayBlockingQueue list;
    
        public Producer(ArrayBlockingQueue list) {
            this.list = list;
        }
    
        @Override
        public void run() {
            boolean limited = false;
            int cnt = 1;
            while (!limited) {
                if (cnt >= 100) {
                    limited = true;
                }
                produce("Product " +  ( cnt++));
            }
        }
    
        public String produce(String product) {
            try {
                Thread.sleep(10);
                list.add(product);
                System.out.println("生产了  = " + product +  "  at : "  +System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }
            return product;
        }
    }
    
    class Product {
    
    }
  • 相关阅读:
    OPC客户端的进程安全初始化
    [精华] Oracle安装(linux)总结一下[转]
    Linux防火墙iptables的设置与启动[转]
    Linux Server 5.5安装SVN+Apache服务[转]
    Red hat Linux Enterprise 5.4 Edtion 学习笔记[二]
    RedHat Linux 5企业版开启VNCSERVER远程桌面功能[转]
    Linux服务配置:Vsftp的基本配置[转]
    Linux查看和剔除当前登录用户
    Ubuntu10.04的中文问题汇集与解决[转]
    Linux下扩展swap分区的方法
  • 原文地址:https://www.cnblogs.com/FlyAway2013/p/10038429.html
Copyright © 2011-2022 走看看