zoukankan      html  css  js  c++  java
  • java Blocking Queue

    一、Java中的阻塞队列

      在多线程之间通信中,多个线程共享一个进程所分配的资源,共享内存是一种常见的通信方式,而阻塞队列则是其实现方式的一种,例如经典的生产者-消费者模式。

      A Queue that addtionally supports operations that wait for the queue to become non-empty when retrieving an element, and  wait for space to become avialable in the queue when storing an element.

      阻塞队列中提供了2个操作:

        队列为空时,获取元素的线程会阻塞队列一直至队列非空。

        队列为满时,存储元素的线程会阻塞队列非满。

      

      

      Java中的阻塞队列有7种:

    • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

      下面进行说明

    二、ArrayBlockingQueue

    使用数组实现队列

     2.1 构造器

    public ArrayBlockingQueue(int capacity,
                              boolean fair,
                              @NotNull Collection<? extends E> c)
    //Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traversal order of the collection's iterator.

    参数说明:

      capacity: 队列的容量

      fair: 默认是false,如果是true的话则移除元素的顺序符合FIFO顺序,false的话没有顺序

      c: 预填充元素

    实现主要如下:

     public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }

    2.2 put方法实现

    /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;

    由于数组这种数据结构的特殊性,若想要线程安全的添加或者删除,都必须将整个数组锁住,因此这里实现使用一把锁, 以及2个条件队列。

    public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }

    这段代码非常容易理解,注意点:

    (1) 可重入锁

    (2) 使用2个条件队列:Java concurrent包对多条件队列的支持,古老的wait和notify方法也可以实现,只是由于条件队列中的条件不同,必须使用notifyall(),损失了性能。

    (3) 可中断锁

     2.3 take() 方法说明

    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    总结: ArrayBlockingQueue应该是最简单的阻塞队列实现了,由于数组结构的特殊性,使用了一把锁和2个条件队列,锁的方式是可中断锁。

    三、LinkedBlockingQueue

    使用链表实现队列,构造器使用方式和ArrayBlockingQueue一样

     public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }

    注意到初始化的时候创建了一个空节点...

    3.1 使用2把锁实现

    这里使用了2把锁,2个条件队列实现,而且在属性定义中就直接初始化了。一个存锁,一个取锁,一个非空条件队列,一个非满条件队列

      private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();

    3.2 put的实现

     public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }

    说明:

    • 除了锁之外,使用了原子变量来记录链表大小,因为使用了2把锁来锁节点,全局链表的大小使用线程安全的原子变量确实比较合适
    • 此时锁的是putLock,存锁

    下面看singalNotEmpty()方法:

     private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }

    发现唤醒时使用了取锁,使用了连续的锁。我们大致梳理一下顺序,在不考虑异常的情况下:

    (1) 当前线程竞争存锁putLock并将其锁住

    (2) 条件队列notFull(非满条件)等待

    (3) 链表中插入元素

    (4) 原子变量自增

    (5) if(c+1<capacity),则条件队列notFull唤醒操作,通知其他的put线程,链表未达上限,依旧可以插入元素

    (6) 释放putLock锁

    (7) 如果c==0,原子变量修改-1->0,表明插入成功,通知其他take线程可以去取出元素

      i. 当前线程竞争takeLock,并且锁住

      ii. notEmpty.signal,通知其他在wait take lock的线程可以取出元素

      iii. 释放takeLock

     

    3.3 take的实现

    类似,略

    3.4 总结

    (1) 使用了2把不同的锁

    (2) 使用原子变量控制容量

    (3) 使用链表数据结构

    为什么可以用2把锁提升性能,减少锁竞争?
    注意下面2个方法:入队方法操作的是last,出对方法操作的first,正是由于链表结构的特殊性,可以使用2把锁来提高锁粒度,从而减少锁竞争。

     private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
        /**
         * Removes a node from head of queue.
         *
         * @return the node
         */
        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }

    四、Synchronousqueue

    个人感觉http://ifeve.com/java-synchronousqueue/讲的比较详细.这里引用一段话描述:

    不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。

    package concurrent.demo.synchronous_queue;
    
    import concurrent.demo.Utils;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * <B>系统名称:</B><BR>
     * <B>模块名称:</B><BR>
     * <B>中文类名:</B><BR>
     * <B>概要说明:</B><BR>
     *
     * @author carl.yu
     * @since 2016/6/14
     */
    public class NativeSynchronousQueue<E> {
    
        E item = null;
        boolean putting = false;
    
        //保证放进去之后,其他的取走了,其他的put线程才允许取
        public synchronized void put(E e) throws InterruptedException {
            //1. 只有一个item为null的时候才允许放
            while (putting) {
                wait();
            }
            putting = true;
            item = e;
            //2. 通知其他的线程来取,可能会通知错了人,造成假唤醒,所以需要用条件队列putting为true来判定,继续睡吧
            notifyAll(); /*必须使用notify不能使用notifyAll*/
            /*这里的notifyAll是为了让其他的take线程醒来,而不是put线程醒来哦*/
    
            //3. 只有取完的线程才可以来拿
            while (item != null) {
                wait();
            }
            putting = false;
            notifyAll();
            /*这里才是为了put线程醒来*/
        }
    
        /*take比较简单,就是拿*/
        public synchronized E take() throws InterruptedException {
            E res = null;
            while (item == null) {
                wait();
            }
            res = item;
            item = null;
            notifyAll();
            return res;
        }
    
    
        //测试
        public static void main(String[] args) throws Exception {
            final NativeSynchronousQueue queue = new NativeSynchronousQueue();
            final CountDownLatch latch = new CountDownLatch(3);
            Thread put01 = new Thread() {
                @Override
                public void run() {
                    try {
                        queue.put("1");
                        System.out.println("put01成功");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            Thread put02 = new Thread() {
                @Override
                public void run() {
                    try {
                        queue.put("2");
                        System.out.println("put02成功");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread take01 = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println("take成功:" + queue.take());
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread take02 = new Thread() {
                @Override
                public void run() {
                    try {
                        System.out.println("take成功:" + queue.take());
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            put01.start();
            Utils.sleep(10);
            put02.start();
            Utils.sleep(10);
            //当没有线程take的时候,put永远不会成功
            take01.start();
            Utils.sleep(10);
            take02.start();
            latch.await();
            System.out.println("测试结束");
        }

     五、条件队列和线程池的使用

    Java中,线程池的使用非常常见,jdk1.8中还新增了许多连接池,例如newWorkStealingPool用作并行计算...

    这里主要强调条件队列的用法

    5.1 使用有界队列实现

    直接演示

    定义任务类:

    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
        
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
        
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
      
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
        }
        
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }

    自定义拒绝策略:

    package com.bjsxt.height.concurrent018;
    
    import java.net.HttpURLConnection;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class MyRejected implements RejectedExecutionHandler{
    
        
        public MyRejected(){
        }
        
       
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义处理..");
            System.out.println("当前被拒绝任务为:" + r.toString());
            
    
        }
    
    }
    public static void main(String[] args) {
            /**
             * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
             * 若大于corePoolSize,则会将任务加入队列,
             * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
             * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
             * 
             */    
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,                 //coreSize
                    2,                 //MaxSize
                    60,             //60
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(3)            //指定一种队列 (有界队列)
                    //new LinkedBlockingQueue<Runnable>()
                    , new MyRejected()
                    //, new DiscardOldestPolicy()
                    );
            
            MyTask mt1 = new MyTask(1, "任务1");
            MyTask mt2 = new MyTask(2, "任务2");
            MyTask mt3 = new MyTask(3, "任务3");
            MyTask mt4 = new MyTask(4, "任务4");
            MyTask mt5 = new MyTask(5, "任务5");
            MyTask mt6 = new MyTask(6, "任务6");
            
            pool.execute(mt1);
            pool.execute(mt2);
            pool.execute(mt3);
            pool.execute(mt4);
            pool.execute(mt5);
            pool.execute(mt6);
            
            pool.shutdown();
            
        }

    逐渐增加任务数,由1增加到6发现效果如下:

    (1) 当任务数<1时,加入一个任务直接创建线程去处理

    (2) 继续加入任务>corePoolSize=1时,会加入队列ArrayBlockingQueue

    (3) 一直加入到任务数为4,队列中3个元素,队列满了

    (4) 任务数继续增加,到5,会继续创建线程一直到maxPoolSize(2),因此此时执行的任务都1和5

    (5) 任务数增加为6,执行拒绝策略

    5.2 无界队列实现

    将上述代码修改为用LinkedBlockingQueue()无界队列实现,发现此时maxSize参数没有作用,拒绝策略也没有作用,只有coreSize才有作用

    (1) 当任务数<coreSize,会创建线程

    (2) 当任务数>coreSize,会将任务放置到无界队列中,直到系统崩溃

    (3) maxSize没有任何作用

    (4) 拒绝策略没有任何作用

    因此,JDK在实现FixedThreadPool时,maxSize和coreSize相等

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    5.3 Synchronousqueue实现

    JDK中CachedThreadPool用这种队列实现,性能非常好

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

      

  • 相关阅读:
    k8s的快速使用手册
    prometheus
    k8s的存储卷
    nfs共享文件服务搭建
    k8s的基本使用
    Linux shell if [ -n ] 正确使用方法
    CURL使用方法详解
    LINUX下NFS系统的安装配置
    RedHat 6.2 Linux修改yum源免费使用CentOS源
    css清除浮动float的三种方法总结,为什么清浮动?浮动会有那些影响?
  • 原文地址:https://www.cnblogs.com/carl10086/p/6019109.html
Copyright © 2011-2022 走看看