zoukankan      html  css  js  c++  java
  • 线程池02-LinkedBlockingQueue 阻塞队列

    首先,我们先了解一下什么是阻塞队列:

    • 当队列满了时,队列会阻塞插入元素的线程,直到队列不满;

    • 当队列为空时,获取元素的线程会等待队列变成非空。

    常用到的方法

    上面是对阻塞队列的简单了解,下面重点分析一下LinkedBlockingQueue。

    源码分析

    Node节点

    • 可以看出是单向的链表结构
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }
    

    构造方法和参数

    • 如果未设置初始容量,则默认是Integer.MAX_VALUE;
       /** 队列容量 */
       private final int capacity;
    
        /** 目前元素数量 */
        private final AtomicInteger count = new AtomicInteger();
    
        /** 头节点 */
        transient Node<E> head;
    
        /** 末尾节点 */
        private transient Node<E> last;
    
        /** take, poll 方法的锁 */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** 等待获取 条件队列*/
        private final Condition notEmpty = takeLock.newCondition();
    
        /** put, offer 方法的锁 */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** 等待存入的 条件队列 */
        private final Condition notFull = putLock.newCondition();
    
    public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    
     public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);//初始化的时候设置头节点和尾节点为两个空节点
        }
    
    

    插入

    put 方法

    • 如果队列已经满了,则放入到条件队列中。
    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);//创建新节点
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();//获取put锁
            try {
                //判断存入的元素个数和配置的数量是否相等,如果相等。那么将当前线程放入到条件队列中
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);//将节点插入到末尾
                c = count.getAndIncrement();//元素数量+1
                if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                    notFull.signal();
            } finally {
                putLock.unlock();释放put锁
            }
            // 唤醒获取条件队列的头节点
            if (c == 0)
                signalNotEmpty();
        }
    
    //将节点设置为尾节点
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }
    // 唤醒“获取条件队列”中的首节点
    private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();//t获取ake锁
            try {
                notEmpty.signal();//唤醒“获取条件队列”中的首节点
            } finally {
                takeLock.unlock();
            }
        }
    

    offer 方法

    • 如果超过容量就无法插入
      public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)//如果超过容量直接返回false,表示不能再插入数据
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);//新建节点
            final ReentrantLock putLock = this.putLock;
            putLock.lock();//获取put锁
            try {
                if (count.get() < capacity) {//小于容量时增加节点,并唤醒“存入条件队列”头节点到同步队列
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)//当前元素数量小于容量的时候,唤醒“存入条件队列”的头节点到同步队列
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            // 唤醒获取条件队列的头节点
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }
    

    take 方法

    • 获取链表中的头节点。如果不存在元素,则将当前线程放入到条件队列中。
    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();//获取锁
            try {
                while (count.get() == 0) {//如果为空则放入“获取条件队列”
                    notEmpty.await();
                }
                x = dequeue();//将节点加入到链表最后
                c = count.getAndDecrement();//数量减1
                if (c > 1)//如果元素书大于1,则调用“获取条件队列”中的元素放入同步队列
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            // 唤醒存入条件队列的头节点
            if (c == capacity)
                signalNotFull();
            return x;//返回头节点
        }
    
    // 获取头节点元素
    private E dequeue() {
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    
    
    // 唤醒“存入条件队列”的头节点到同步队列
     private void signalNotFull() {
          final ReentrantLock putLock = this.putLock;
          putLock.lock();
          try {
              notFull.signal();
          } finally {
              putLock.unlock();
          }
      }
    
    

    poll方法

    • 如果没有数据立即返回null
       public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;//获取锁
            takeLock.lock();
            try {
                if (count.get() > 0) {//当前容器数量大于0时
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            // 唤醒存入条件队列的头节点
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    总结

    1.如何保证当队列没有消息或者消息满了的时候,进行监听?

    上面看代码的时候,两段代码刚开始是有点懵的。
    
    1.存入的方法
      // 唤醒获取条件队列的头节点
      if (c == 0) signalNotEmpty();
    
    2.获取的方法
      // 唤醒存入条件队列的头节点
      if (c == capacity) signalNotFull();
    
    其实这就监听的重要环节。
    逻辑是这样的。以存入为例(获取同样的道理):
    1.如果当前节点为0,说明队列中没有任务;
    2.唤醒“获取条件队列”的头节点,去尝试获取元素。如果获取到则执行,如果没有,则依然放入到“获取条件队列”的末尾;
    3.这样就可以保证在存入数据的时候,实时监听获取节点元素了。
    
    
  • 相关阅读:
    redis 五种数据结构详解(string,list,set,zset,hash)
    推荐一个同步Mysql数据到Elasticsearch的工具
    一些经验,用来鉴别不太靠谱的公司或工作(面试是双向的,是你最好的了解这个公司的机会)
    OpenSSL 使用 base64 编码/解码(liang19890820)
    Qt之QEvent(所有事件的翻译)
    Go 在 Windows 上用户图形界面 GUI 解决方案 Go-WinGUI 国产(使用cef 内核)
    卷积神经网络CNN
    Event Driven Architecture
    wineshark分析抓取本地回环包
    僵尸进程与孤儿进程
  • 原文地址:https://www.cnblogs.com/perferect/p/13723857.html
Copyright © 2011-2022 走看看