zoukankan      html  css  js  c++  java
  • BlockingQueue的几个实现分析

    ArrayBlockingQueue

     底层以数组的结构存放队列元素,容量大小不可改变。

    先看下变量:

    items:数组,用于存放队列中的元素

    takeIndex:获取元素的索引位置

    putIndex:存放元素的索引位置

    count:队列中当前元素数量

    lock:控制队列进出的锁,ArrayBlockingQueue中进出共用一把锁,LinkedBlockingQueue中进出是两把分开的锁

    notEmpty:从队列获取元素时的等待条件,也就是队列空时,获取元素的线程会阻塞,直到有线程放元素进来,notEmpty.signal唤醒

    notFull:往队列里放元素时的等待条件,也就是队列满时,存放元素的线程会阻塞,直到有线程从队列里取元素并调用notFull.signal唤醒

    下面看下代码更直观些:

    put

    1.先获得锁

    2.循环判断当前元素数量是否等于数组长度,也就是队列的容量,等于的话就挂起当前线程,直到notFull.signal

    3.插入元素到队列中,代码如下:

    putIndex为下一次入队的索引位置,先把元素入队,然后+1后判断是否等于数组长度,如果等于就置位0,从0开始

    最后count+1并notEmpty.signal唤醒因为队列为空导致获取元素阻塞的线程。

    take

    1.获取锁,注意,是和put同一把锁,也就是同时只能有一个线程获得操作队列的权限,无论是入队还是出队。

    2.循环判断如果此时队空,就挂起当前线程,等待notEmpty.signal

    3.出队:

    获取元素并将数组中相应位置置空,设置下次入队索引并count-1,notFull.signal唤醒因为队满而阻塞的线程并返回出队元素

    LinkedBlockingQueue

     底层以单向链表结构存放队列元素,put和take分别采用两把不同的锁,也就是读写是并发运行的,

    那么怎样控制并发环境下的出队和入队呢?答案就是使用AtomicInteger原子操作当前队列中元素数量。

    下面看下有哪些实例变量:

    capacity:队列容量,可以在构造方法中设置容量,如果不设置的话,默认为Integer.MAX_VALUE

    count:当前队列中的元素数量,AtomicInteger类型,因为这里的put和take是两把锁,也就是会并发修改count,所以这里采用原子操作。

    head:队头

    last:队尾

    takeLock:出队锁

    notEmpty:出队时,如果队空,就等待notEmpty.signal

    putLock:入队锁

    notFull:入队时,如果队满,就等待notFull.signal

    put

    public void put(E e) throws InterruptedException {
         // 因为在出队时如果队空,有些方法是非阻塞或者等待一段时间后返回null的,比如poll
         // 所以禁止往队列中入队null
    if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1;
         // 将入队元素包装进链表的Node对象中 Node
    <E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count;
         // 获取入队锁 putLock.lockInterruptibly();
    try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */
           // 因为count加操作都在putLock中,所以这里可以是==判断
    while (count.get() == capacity) { notFull.await(); }
           // 入队 enqueue(node);
           // 获取原来count值并原子加一 c
    = count.getAndIncrement();
           // 如果队列未满,通知阻塞线程继续入队
    if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); }
         // 如果队列之前是队空状态,说明之前有可能存在take元素的线程因为队空而阻塞,这里通知takeLock唤醒等待线程
    if (c == 0) signalNotEmpty(); }

    很简单,就是将刚刚的node链接到队尾

    take

    1.先获得takeLock

    2.如果队空,挂起等待

    3.出队

    4.获取count值并减一

    5.如果队列中还有元素,notEmpty唤醒之前因为队空阻塞的线程

    6.如果take之前状态为队满,说明有可能存在因为队满而导致入队阻塞的线程,

    唤醒他们(其实这里signal只唤醒一个,在被唤醒的线程完成入队后判断如果还有空间就继续signal,是这样一连串的动作)

    出队逻辑如上,保持head的item为null,这里需要说明的是h.next = h自身循环引用也是会被GC的

    SynchronousQueue

     SynchronousQueue本身不存储元素,put元素时必须同步等待元素被取走。

    PriorityBlockingQueue

     具有优先级的队列

    DelayQueue

     具有延时的队列,延时不结束就不能取数据

  • 相关阅读:
    北风网第一季度菜单6
    北风网微信第一季菜单5
    win7卸载打印机驱动
    myeclipse 10激活,本人已测试过可行
    北风网视频菜单4
    Code Project精彩系列(1)
    Code Project精彩系列(1)
    Code Project精彩系列(1)
    实现Windows和Linux之间的文件共享
    实现Windows和Linux之间的文件共享
  • 原文地址:https://www.cnblogs.com/restart30/p/10999393.html
Copyright © 2011-2022 走看看