zoukankan      html  css  js  c++  java
  • BoundedBuffer实现生产者、消费者模式

    BoundedBuffer类来自于Condition接口的注释代码段!

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    class BoundedBuffer {
       final Lock lock = new ReentrantLock();
       final Condition notFull  = lock.newCondition();
       final Condition notEmpty = lock.newCondition(); 
    
       final Object[] items = new Object[100];
       int putptr, takeptr, count;
    
       public void put(Object x) throws InterruptedException {
         lock.lock();
         try {
           while (count == items.length)
             notFull.await();
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           notEmpty.signal();
         } finally {
           lock.unlock();
         }
       }
    
       public Object take() throws InterruptedException {
         lock.lock();
         try {
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           notFull.signal();
           return x;
         } finally {
           lock.unlock();
         }
       }
     }

    测试代码:

    import java.util.HashSet;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Main {
        private static int Init = 0;
        private static HashSet<Integer> hashSet = new HashSet();
        private static volatile boolean Finish = true;
    
        public static void main(String[] args) throws InterruptedException {
            BoundedBuffer boundedBuffer = new BoundedBuffer();
            ExecutorService exec = Executors.newCachedThreadPool();
            int CONSUMER_COUNT = 20;
            int PRODUCER_COUNT = 2;
            int PRODUCT_THREAD = 100;
            int SUM_PRODUCT = PRODUCT_THREAD*PRODUCER_COUNT;
            for (int i = 0; i < PRODUCER_COUNT; i++) {
                exec.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("Produce Thread Run!");
                        for (int i = 0; i < PRODUCT_THREAD; i++) {
                            try {
                                System.out.println("putting..");
                                boundedBuffer.put(Integer.valueOf(Init++));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }
    
            for (int i = 0; i < CONSUMER_COUNT; i++) {
                exec.submit(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("Produce Thread Run!");
                        for (; !Thread.interrupted(); ) {
                            try {
                                if (hashSet.size() == SUM_PRODUCT) {
                                    exec.shutdownNow();
                                    //Finish = false;
                                }
                                Integer val = (Integer) boundedBuffer.take();
                                hashSet.add(val);
                                System.out.println(val);
    
                            } catch (InterruptedException e) {
                                //take()发出的中断信号被catch后,标志为将被清楚,要想被for捕捉到,必须重新设置中断!
                                if (e instanceof  InterruptedException){
                                    Thread.currentThread().interrupt();
                                }
                            }
                        }
                    }
                });
            }
    
            exec.shutdown();
            //阻塞,等待所有任务执行完毕!
            for (;!exec.awaitTermination(10, TimeUnit.NANOSECONDS););
            System.out.println("hashSet.size():" + hashSet.size());
        }
    
    }

    注意:注意这里中断消费者线程的方式!

  • 相关阅读:
    Object有哪些方法?
    去除掉myeclipse2017页面右上角的图片
    报错 IllegalArgumentException occurred calling getter of cn.itcast.entity.Customer.cid
    如果在applicationContext.xml中没有配置bean的属性,那么也会导致空指针异常
    报错HTTP Status 500
    srm开发(基于ssh)(4)
    报错HTTP Status 500
    HTTP Status 500
    数字
    算法总结——树状数组
  • 原文地址:https://www.cnblogs.com/iuyy/p/13574325.html
Copyright © 2011-2022 走看看