zoukankan      html  css  js  c++  java
  • Condition对象以及ArrayBlockingQueue阻塞队列的实现(使用Condition在队满时让生产者线程等待, 在队空时让消费者线程等待)

    Condition对象

    一)、Condition的定义

    Condition对象:与锁关联,协调多线程间的复杂协作。

    获取与锁绑定的Condition对象:

    Lock lock = new ReentrantLock();

    Conndition condition = lock.newConndition();

    Condition的方法:

    await(): 使当前的线程等待并释放锁。

    singalAll(): 唤醒所有等待的线程,只有一个线程重新获得锁,并执行。

    awaitterruptibly(): 使当前线程等待并释放锁,但在等待过程中不响应中断。

    singal(): 唤醒一个正在等待的线程。

    注:当线程处于中断状态也能跳出等待。

    Condition对象与锁的关系

    相当于object.wait(),Object.notify()与Synchronized一样,配合使用,以完成多线程协作的控制。

    Lock lock = new ReentrantLock();
    Conndition condition = lock.newCondition();
    //线程进入等待状态
    condition.await();
    //唤醒等待的线程
    condition.notify();
    

    二)、ArrayBlockingQueue: 使用Condition实现队列的阻塞

    (2-1):ArrayBlockQueue的主要属性

     //存放元素的数组,当元素个数超过数组的长度,调用的线程进入阻塞状态
    final Object[] items;
    //取出元素的数组下标
    int takeIndex;
    //添加元素的下标
    int putIndex;
    //数组中元素的个数
    int count;
    //公共锁对象
    final ReentrantLock lock;
    //与锁相对应的Condition(锁的监视器)
    //等待队列不为空的时候
    private final Condition notEmpty;
    //等待队列不为满的时候
    private final Condition notFull;
    
    

    (2-2):构造方法

        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            //创建公共锁对象,final修饰,对象一创建不能修改,final修饰的变量不能改变他的引用地址,但是可以改变它的值。
            lock = new ReentrantLock(fair);
            //与锁相关联的Condition
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    

    (2-3):ArrayBlockingQueue的主要方法:

    1). put(E e): 添加元素到队列

    特点:

    1).当队列的元素满时,阻塞当前的线程。

    ​ noFull.await() : 等待队列不满的时候。

    2).添加一个元素后,唤醒一个消费线程。

    ​ noEmpty.singnal(): 队列不空,发出信号,唤醒等待不为空的线程

        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    //等待队列不为满的时候
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    //入队
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            //唤醒等待不为空的线程
            notEmpty.signal();
        }
    

    2). take(): 获取队列中的元素

    特点:

    1).当队列为空时,阻塞当前调用线程。

    ​ noEmpty.await: 等待队列非空的时候

    2).获取一个元素,唤醒一个生产线程。

    ​ noFull.singnal(): 唤醒一个等待队列不为满的线程

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    //线程进入等待状态,等待队列非空
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
    //出列
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            //唤醒一个等待队列不为满时的线
            notFull.signal();
            return x;
        }
    

    (2-4):队列先进先出的控制

    putIndex: 0 - item.length, count++

    if (++putIndex == items.length)
                putIndex = 0;
            count++;
    

    takeIndex:0 - item.length,count--

    if (++takeIndex == items.length)
               //控制着元素的先进先出
                takeIndex = 0;
            count--;
    

    总结: 元素从0开始写入,从0开始读取,由count控制着元素的读取,当putIndex =

    ​ item.length时,只要count不等于item.length,那么item[0]的元素必定被消

    ​ 费,当takeIndex = item.length时,只要count不等于0,item[0]必有元素存

    ​ 在。

    count: 0 - item.length,控制着是否读取元素或写入元素

    三)、使用ArrayBlockingQueue来构建生产者 - 消费者模式

    生产者:Producer

    /**
     * 生产者进程
     *
     * 生产者 - 消费者模式
     *    共同维护一个存储队列
     *    队列特点:
     *    队列满时,阻塞生产者,线程进入等待状态。
     *    队列为空时,阻塞消费者,线程进入等待状态。
     */
    public class Producer extends Thread {
        /**
         * 生产者维护的生产队列,指明队列存储元素的大小
         */
        protected ArrayBlockingQueue queue;
    
        public Producer(ArrayBlockingQueue queue,String name) {
            super(name);
            this.queue = queue;
        }
    
        /**
         * 生产者生产线程
         */
        @Override
        public void run() {
            try {
                Thread.sleep(100);
                Object obj = new Object();
                System.out.println(Thread.currentThread().getName()+" 正在生产");
                //模拟生产者的生产过程
                queue.put(obj);
                System.out.println(Thread.currentThread().getName()+" 生产了一件商品。。。");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    消费者:Consumer

    /**
     * 消费者线程,维护了一个消费队列
     */
    public class Consumer extends Thread{
        protected  ArrayBlockingQueue queue;
    
        public Consumer(ArrayBlockingQueue queue,String name) {
            super(name);
            this.queue = queue;
        }
    
        @Override
        public void run() {
            //模拟消费者线程进行消费
            try {
                System.out.println(Thread.currentThread().getName()+" 要开始消费了");
                queue.take();
                System.out.println(Thread.currentThread().getName()+" 消费了一件商品");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    启动线程:Client

    /**
     * 开启多个线程进行生产,开启多个线程进行消费
     */
    
    public class Client {
        public static void main(String[] args) {
            ArrayBlockingQueue queue = new ArrayBlockingQueue(20);
            ExecutorService executor = Executors.newFixedThreadPool(200);
            Producer producer1 = new Producer(queue,"producer - 1");
            Producer producer2 = new Producer(queue,"producer - 2");
            Producer producer3 = new Producer(queue,"producer - 3");
            Consumer consumer1 = new Consumer(queue,"consumer - 1");
            Consumer consumer2 = new Consumer(queue,"consumer - 2");
            Consumer consumer3 = new Consumer(queue,"consumer - 3");
            //开启多个线程进行生产
            executor.execute(producer1);
            executor.execute(producer2);
            executor.execute(producer3);
    
            //开启多个线程进行消费
            executor.execute(consumer1);
            executor.execute(consumer2);
            executor.execute(consumer3);
    
        }
    }
    
    

    结果:

    pool-1-thread-5 要开始消费了
    pool-1-thread-4 要开始消费了
    pool-1-thread-6 要开始消费了
    pool-1-thread-1 正在生产
    pool-1-thread-1 生产了一件商品。。。
    pool-1-thread-2 正在生产
    pool-1-thread-3 正在生产
    pool-1-thread-5 消费了一件商品
    pool-1-thread-6 消费了一件商品
    pool-1-thread-4 消费了一件商品
    pool-1-thread-2 生产了一件商品。。。
    pool-1-thread-3 生产了一件商品。。。
    

    结果分析:

    消费者消费ArrayBlockingQueue的数据时,当队列为空的时候会阻塞当前的线程,当生产者生产了一件商品后会唤醒一个阻塞的线程。

    练习代码github地址:https://github.com/slob-cow/java_performance_optimization/tree/master/Condition

    金麟岂能忍一世平凡 飞上了青天 天下还依然
  • 相关阅读:
    通过PDB文件实现非嵌入式的c++反射
    在c++中实现反射的初步想法
    对比特币相关的一些技术细节的补充
    初窥比特币
    根据一个坐标查找其所属区域的一些优化想法
    go两种数据类型的区别、数据类型和操作符、常量、变量声明
    go的相关包time、os、rand、fmt
    go语言的特性
    结构体
    defer、panic、recover
  • 原文地址:https://www.cnblogs.com/Auge/p/11765809.html
Copyright © 2011-2022 走看看