zoukankan      html  css  js  c++  java
  • LinkedBlockingQueue 小解

    LinkedBlockingQueue

    链表阻塞队列 , 以下是重要属性

    • head
    • last
    • ReenterLock putLock
    • ReenterLock takeLock
    • Condition notEmpty
    • Condition notFull
      • Condition 有一个有序链表,保存等待线程

    总结

    把总结放前边,省得每次翻到最后

    链表阻塞队列最恶心的地方是, 这个类里有五个链表!!

    • 数据链表 Node

      链表自身维护数据的链表 , Node节点, headlast变量就是链表的头和尾.

    • putLock (ReenterLock) 写入的阻塞链表

      • 要线程安全,就得要锁.拿不到锁的,就要ReenterLock的链表里等着吧.
    • takeLock (ReenterLock) 获取的阻塞链表

      • 同理, 拿不到锁的 , 在takeLock的链表里等着吧.

    往数据链表里读写数据都要先拿锁 . 拿不到锁的.都得等着!! 这些等着的线程 ,还没跟此类做实际的交互.

    • notEmpty (Condition)
      • 拿到takeLock了, 但是数据链表没东西呀! 那就在Condition的线程链表里等着吧
    • notFull (Condition)
      • 拿到putLock了 , 但是数据链表已经满了呀!! 去Condition的线程链表里待着吧!

    notEmpty和notFull等待的可能有多个 , 这些是已经拿到锁了, 可以操作了的 . 就需要相互唤起了.

    写操作区别

    区别是在 链表容量满时 , 用什么策略.

    • put(E e) : 满了 , 再写入的话 , 就把写入线程阻塞. 必须等着. 简单粗暴!
    • offer(E e) : 满了,就不写入了 . 直接返回写失败了.
    • offer(E e, long timeout, TimeUnit unit) : 满了 , 就等会儿 , 还不行就算了.

    读操作区别

    和写入类似 , 是在链表数据空了后的方式不同.

    • take() : 霸道! 强制阻塞. 没了就等着吧. 等有了再给 .
    • poll() : 温柔! 没了,给不了了 . 也别等了. 谁知道啥时候有呢
    • poll(long timeout, TimeUnit unit) : 人性! 暂时没了 , 没准呆会儿有了呢 , 要不您等等?
    • peek() : 蹭的! 不用取出来,只给第一个就行.

    往链表写入

    三种写入操作

    • put(E e)
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();//不能是null
        int c = -1; //负数表示写入失败
        Node<E> node = new Node<E>(e);
        //lock / count 约定使用本地变量
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //容量满了, 强制等待.线程阻塞
                notFull.await();
            }
            //进链表
            enqueue(node);
            //先获取计数, 再加 1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //如果链表还没到最大容量, 唤起其他等待写入的线程
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //原来链表容量是0的话 , 则表示有新的进链表了 . 唤起等待获取的线程
            signalNotEmpty();
    }
    
    • offer(E e)
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        //跟put不一样 , 满了不写入 , 直接返回失败
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                //写入流程
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //原来链表容量是0的话 , 则表示有新的进链表了 . 唤起等待获取的线程
            signalNotEmpty();
        return c >= 0;
    }
    
    • offer(E e, long timeout, TimeUnit unit)
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            //容量满时则等待指定时长 , 等待超时还没写入的话 , 则算失败
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
    
    • enqueue(Node node)
    // 直接队尾塞
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }
    

    三种操作区别

    区别是在 链表容量满时 , 用什么策略.

    • put(E e) : 满了 , 再写入的话 , 就把写入线程阻塞. 必须等着. 简单粗暴!
    • offer(E e) : 满了,就不写入了 . 直接返回写失败了.
    • offer(E e, long timeout, TimeUnit unit) : 满了 , 就等会儿 , 还不行就算了.

    不同场景适用不同的方式.

    从链表读取

    四种取的方式

    • take()
    public E take() throws InterruptedException {
        E x;
        int c = -1; // 默认-1 , 表示失败
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                //链表已经没东西了. 等着吧
                notEmpty.await();
            }
            //这里表示有 . 那就取出来吧
            x = dequeue();
            // count计算得减一,这里是先获取,再减一
            c = count.getAndDecrement();
            if (c > 1)
                //链表里还有, 唤起别的在等待取的线程
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            //count原来是满的 . 取出后就不满了. 其他写的线程可以继续写了.
            signalNotFull();
        return x;
    }
    
    • poll(long timeout, TimeUnit unit)
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        //等待的时长换算成纳秒
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                //链表没东西了,等着吧.
                if (nanos <= 0)
                    return null;
                //不能一直等,我的耐心是有限度的!
                nanos = notEmpty.awaitNanos(nanos);
            }
            //等到了,取出来吧.
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            //count原来是满的 . 取出后就不满了. 其他写的线程可以继续写了.
            signalNotFull();
        return x;
    }
    
    • poll()
    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            //链表没东西了, 别取了. 只能给个null了
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                //有东西呀, 给吧,别客气
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    
    • peek()
    // 不取出来, 只要第一个实际内容
    public E peek() {
        if (count.get() == 0)
            //链表东西,要啥都没用
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                //来晚了, 锁之前还有 , 你锁上后就没了.
                return null;
            else
                //有得 . 大爷你拿好
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    

    四种方式区别

    和写入类似 , 是在链表数据空了后的方式不同.

    • take() : 霸道! 强制阻塞. 没了就等着吧. 等有了再给 .
    • poll() : 温柔! 没了,给不了了 . 也别等了. 谁知道啥时候有呢
    • poll(long timeout, TimeUnit unit) : 人性! 暂时没了 , 没准呆会儿有了呢 , 要不您等等?
    • peek() : 蹭的! 不用取出来,只给第一个就行.

    从链表中删除

    // 把节点p ,从前一个节点的链路上去掉
    // 只在remove方法中使用. 
    // 方法是void , 不开放使用的.
    void unlink(Node<E> p, Node<E> trail) {
        p.item = null; // 把不要的东西扔了, GC会很乐意的.
        //节点链路变更
        trail.next = p.next;
        if (last == p)
            //p是最后一个节点, 去掉了, trail就是最后一个节点了.
            //倒数第一走了. 倒数第二就变成倒数第一了.
            last = trail;
        if (count.getAndDecrement() == capacity)
            //容量满的话,去掉一个,就该唤起等着写的线程
            notFull.signal();
    }
    // remove是很重的操作, fullyLock()呀!
    public boolean remove(Object o) {
        if (o == null) return false;
        // 同时putlock和takelock. 
        // 谁都别动. 等我把要走的干掉再说.
        fullyLock();
        try {
            for (Node<E> trail = head, p = trail.next;
                    p != null;
                    trail = p, p = p.next) {
                // 遍历找到那个 要 干掉的 p
                if (o.equals(p.item)) {
                    //我艹, 找到了, 干掉!!
                    unlink(p, trail);
                    return true;
                }
            }
            // 哎, 没找到呀. 是不是眼花了呀... 
            return false;
        } finally {
            fullyUnlock();
        }
    }
    
    删除总结

    删除会阻塞所有写和读操作.

  • 相关阅读:
    mybatisplus学习
    JSON数组读取和保存
    XML文件读写示例
    JSON代码示例
    Delphi接口示例代码
    获得wav语音文件时长
    javaweb注册页面学习之路(三)
    javaweb注册页面学习之路(二)
    javaweb注册页面学习之路(一)
    Django 目录
  • 原文地址:https://www.cnblogs.com/ElEGenT/p/12526664.html
Copyright © 2011-2022 走看看