zoukankan      html  css  js  c++  java
  • 并发编程实践之公平有界阻塞队列实现

    简介: JUC 工具包是 JAVA 并发编程的利器。本文讲述在没有 JUC 工具包帮助下,借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。希望你也能在文后体会到并发编程的复杂之处,以及 JUC 工具包的强。

    image.png

    作者 | 李新然

    来源 | 阿里技术公众号

    一 背景

    JUC 工具包是 JAVA 并发编程的利器。

    本文讲述在没有 JUC 工具包帮助下,借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。

    希望你也能在文后体会到并发编程的复杂之处,以及 JUC 工具包的强大。

    二 方法

    本文使用到的基本工具:

    1. 同步监听器 synchronized ,方法基本和代码块级别;
    2. Object 基础类的 wait, notify, notifyAll;

    基于以上基础工具,实现公平有界的阻塞队列,此处:

    1. 将公平的定义限定为 FIFO ,也就是先阻塞等待的请求,先解除等待;
    2. 并不保证解除等待后执行 Action 的先后顺序;
    3. 确保队列的大小始终不超过设定的容量;但阻塞等待的请求数不做限制;

    三 实现

    1 基础版本

    首先,考虑在非并发场景下,借助 ADT 实现一个基础版本

    interface Queue {
    
        boolean offer(Object obj);
    
        Object poll();
    
    }
    class FairnessBoundedBlockingQueue implements Queue {
        // 当前大小
        protected int size;
    
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.head = new Node(null);
            this.tail = head;
            this.size = 0;
        }
    
        // 如果队列已满,通过返回值标识
        public boolean offer(Object obj) {
            if (size < capacity) {
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                ++size;
                return true;
            }
            return false;
        }
    
        // 如果队列为空,head.next == null;返回空元素
        public Object poll() {
            if (head.next != null) {
                Object result = head.next.value;
                head.next.value = null;
                head = head.next; // 丢弃头结点
                --size;
                return result;
            }
            return null;
        }
    
        class Node {
            Object value;
            Node next;
            Node(Object obj) {
                this.value = obj;
                next = null;
            }
        }
    }

    以上

    1. 定义支持队列的两个基础接口, poll 和 offer;
    2. 队列的实现,采用经典实现;
    3. 考虑在队列空的情况下, poll 返回为空,非阻塞;
    4. 队列在满的情况下, offer 返回 false ,入队不成功,无异常;

    需要注意的一点:在出队时,本文通过迁移头结点的方式实现,避免修改尾结点。
    在下文实现并发版本时,会看到此处的用意。

    2 并发版本

    如果在并发场景下,上述的实现面临一些问题,同时未实现给定的一些需求。

    通过添加 synchronized ,保证并发条件下的线程安全问题。

    注意此处做同步的原因是为了保证类的不变式。

    并发问题

    在并发场景下,基础版本的实现面临的问题包括:原子性,可见性和指令重排的问题。

    参考 JMM 的相关描述。

    并发问题,最简单的解决方法是:通过 synchronized 加锁,一次性解决问题。

    // 省略接口定义
    class BoundedBlockingQueue implements Queue {
        // 当前大小
        protected int size;
    
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        public BoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.head = new Node(null);
            this.tail = head;
            this.size = 0;
        }
    
        // 如果队列已满,通过返回值标识
        public synchronized boolean offer(Object obj) {
            if (size < capacity) {
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                ++size;
                return true;
            }
            return false;
        }
    
        // 如果队列为空,head.next == null;返回空元素
        public synchronized Object poll() {
            if (head.next != null) {
                Object result = head.next.value;
                head.next.value = null;
                head = head.next; // 丢弃头结点
                --size;
                return result;
            }
            return null;
        }
        // 省略 Node 的定义
    }

    以上,简单粗暴的加 synchronized 可以解决问题,但会引入新的问题:系统活性问题(此问题下文会解决)。

    同时,简单加 synchronized 同步是无法实现阻塞等待;即

    1. 如果队列为空,那么出队的动作还是会立即返回,返回为空;
    2. 如果队列已满,那么入队动作还是会立即返回,返回操作不成功;

    实现阻塞等待,需要借助 JAVA 中的 PV 原语:wait, notify, notifyAll 。

    参考:JDK 中对 wait, notify, notifyAll 的相关描述。

    卫式方法

    阻塞等待,可以通过简单的卫式方法来实现,此问题本质上可以抽象为:

    1. 任何一个方法都需要在满足一定条件下才可以执行;
    2. 执行方法前需要首先校验不变式,然后执行变更;
    3. 在执行完成后,校验是否满足后验不变式;
    WHEN(condition) Object action(Object arg) {
        checkPreCondition();
        doAction(arg);
        checkPostCondition();
    }

    此种抽象 Ada 在语言层面上实现。在 JAVA 中,借助 wait, notify, notifyAll 可以翻译为:

    // 当前线程
    synchronized Object action(Object arg) {
        while(!condition) {
            wait();
        }
        // 前置条件,不变式
        checkPreCondition();
        doAction();
        // 后置条件,不变式
        checkPostCondition();
    }
    
    // 其他线程
    synchronized Object notifyAction(Object arg) {
        notifyAll();
    }

    需要注意:

    1. 通常会采用 notifyAll 发送通知,而非 notify ;因为如果当前线程收到 notify 通知后被中断,那么系统将一直等待下去。
    2. 如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后,执行条件已经不满足,虽然当前线程持有互斥锁。
    3. 卫式条件的所有变量,有任何变更都需要发送 notifyAll 不然面临系统活性问题

    据此,不难实现简单的阻塞版本的有界队列,如下

    interface Queue {
    
        boolean offer(Object obj) throws InterruptedException;
    
        Object poll() throws InterruptedException;
    
    }
    class FairnessBoundedBlockingQueue implements Queue {
        // 当前大小
        protected int size;
    
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.head = new Node(null);
            this.tail = head;
            this.size = 0;
        }
    
        // 如果队列已满,通过返回值标识
        public synchronized boolean offer(Object obj) throws InterruptedException {
            while (size < capacity) {
                wait();
            }
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            ++size;
            notifyAll(); // 可以出队
            return true;
        }
    
        // 如果队列为空,阻塞等待
        public synchronized Object poll() throws InterruptedException {
            while (head.next == null) {
                wait();
            }
            Object result = head.next.value;
            head.next.value = null;
            head = head.next; // 丢弃头结点
            --size;
            notifyAll(); // 可以入队
            return result;
        }
        // 省略 Node 的定义
    }

    以上,实现了阻塞等待,但也引入了更大的性能问题

    1. 入队和出队动作阻塞等待同一把锁,恶性竞争;
    2. 当队列变更时,所有阻塞线程被唤醒,大量的线程上下文切换,竞争同步锁,最终可能只有一个线程能执行;

    需要注意的点:

    1. 阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理;
    2. 接口需要支持抛出中断异常;
    3. 队里变更需要 notifyAll 避免线程中断或异常,丢失消息;

    3 锁拆分优化

    以上第一个问题,可以通过锁拆分来解决,即:定义两把锁,读锁和写锁;读写分离。

    // 省略接口定义
    class FairnessBoundedBlockingQueue implements Queue {
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        // guard: canPollCount, head
        protected final Object pollLock = new Object();
        protected int canPollCount;
    
        // guard: canOfferCount, tail
        protected final Object offerLock = new Object();
        protected int canOfferCount;
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.canPollCount = 0;
            this.canOfferCount = capacity;
            this.head = new Node(null);
            this.tail = head;
        }
    
        // 如果队列已满,通过返回值标识
        public boolean offer(Object obj) throws InterruptedException {
            synchronized(offerLock) {
                while(canOfferCount <= 0) {
                    offerLock.wait();
                }
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                canOfferCount--;
            }
            synchronized(pollLock) {
                ++canPollCount;
                pollLock.notifyAll();
            }
            return true;
        }
    
        // 如果队列为空,阻塞等待
        public Object poll() throws InterruptedException {
            Object result = null;
            synchronized(pollLock) {
                while(canPollCount <= 0) {
                    pollLock.wait();
                }
    
                result = head.next.value;
                head.next.value = null;
                head = head.next;
                canPollCount--;
            }
            synchronized(offerLock) {
                canOfferCount++;
                offerLock.notifyAll();
            }
            return result;
        }
        // 省略 Node 定义
    }

    以上

    1. 定义了两把锁, pollLock 和 offerLock 拆分出队和入队竞争;
    2. 入队锁同步的变量为:callOfferCount 和 tail;
    3. 出队锁同步的变量为:canPollCount 和 head;
    4. 出队的动作:首先拿到 pollLock 卫式等待后,完成出队动作;然后拿到 offerLock 发送通知,解除入队的等待线程。
    5. 入队的动作:首先拿到 offerLock 卫式等待后,完成入队的动作;然后拿到 pollLock 发送通知,解除出队的等待线程。

    以上实现

    1. 确保通过入队锁和出队锁,分别保证入队和出队的原子性;
    2. 出队动作,通过特别的实现,确保出队只会变更 head ,避免获取 offerLock;
    3. 通过 offerLock.notifyAll 和 pollLock.notifyAll 解决读写竞争的问题;

    但上述实现还有未解决的问题:

    当有多个入队线程等待时,一次出队的动作会触发所有入队线程竞争,大量的线程上下文切换,最终只有一个线程能执行。

    即,还有 读与读 和 写与写 之间的竞争问题。

    4 状态追踪解除竞争

    此处可以通过状态追踪,解除读与读之间和写与写之间的竞争问题

    class FairnessBoundedBlockingQueue implements Queue {
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        // guard: canPollCount, head
        protected final Object pollLock = new Object();
        protected int canPollCount;
        protected int waitPollCount;
    
        // guard: canOfferCount, tail
        protected final Object offerLock = new Object();
        protected int canOfferCount;
        protected int waitOfferCount;
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.canPollCount = 0;
            this.canOfferCount = capacity;
            this.waitPollCount = 0;
            this.waitOfferCount = 0;
            this.head = new Node(null);
            this.tail = head;
        }
    
        // 如果队列已满,通过返回值标识
        public boolean offer(Object obj) throws InterruptedException {
            synchronized(offerLock) {
                while(canOfferCount <= 0) {
                    waitOfferCount++;
                    offerLock.wait();
                    waitOfferCount--;
                }
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                canOfferCount--;
            }
            synchronized(pollLock) {
                ++canPollCount;
                if (waitPollCount > 0) {
                    pollLock.notify();
                }
            }
            return true;
        }
    
        // 如果队列为空,阻塞等待
        public Object poll() throws InterruptedException {
            Object result;
            synchronized(pollLock) {
                while(canPollCount <= 0) {
                    waitPollCount++;
                    pollLock.wait();
                    waitPollCount--;
                }
    
                result = head.next.value;
                head.next.value = null;
                head = head.next;
                canPollCount--;
            }
            synchronized(offerLock) {
                canOfferCount++;
                if (waitOfferCount > 0) {
                    offerLock.notify();
                }
            }
            return result;
        }
        // 省略 Node 的定义
    }

    以上

    1. 通过 waitOfferCount 和 waitPollCount 的状态追踪解决 读写内部的竞争问题;
    2. 当队列变更时,根据追踪的状态,决定是否派发消息,触发线程阻塞状态解除;

    但,上述的实现在某些场景下会运行失败,面临活性问题,考虑

    情况一:

    1. 初始状态队列为空 线程 A 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==1;
    2. 此时线程 A 在执行 wait 时被中断,抛出异常, waitPollCount==1 并未被重置;
    3. 阻塞队列为空,但 waitPollCount==1 类状态异常;

    情况二:

    1. 初始状态队列为空 线程 A B 执行出队动作,被阻塞在 pollLock , 此时 waitPollCount==2;
    2. 线程 C 执行入队动作,可以立即执行,执行完成后,触发 pollLock 解除一个线程等待 notify;
    3. 触发的线程在 JVM 实现中是随机的,假设线程 A 被解除阻塞;
    4. 假设线程 A 在阻塞过程中已被中断,阻塞解除后 JVM 检查 interrupted 状态,抛出 InterruptedException 异常;
    5. 此时队列中有一个元素,但线程 A 仍阻塞在 pollLock 中,且一直阻塞下去;

    以上为解除阻塞消息丢失的例子,问题的根源在与异常处理。

    5 解决异常问题

    解决线程中断退出的问题,线程校验中断状态的场景

    1. JVM 通常只会在有限的几个场景检测线程的中断状态, wait, Thread.join, Thread.sleep;
    2. JVM 在检测到线程中断状态 Thread.interrupted() 后,会清除中断标志,抛出 InterruptedException;
    3. 通常为了保证线程对中断及时响应, run 方法中需要自主检测中断标志,中断线程,特别是对中断比较敏感需要保持类的不变式的场景;
    class FairnessBoundedBlockingQueue implements Queue {
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        // guard: canPollCount, head, waitPollCount
        protected final Object pollLock = new Object();
        protected int canPollCount;
        protected int waitPollCount;
    
        // guard: canOfferCount, tail, waitOfferCount
        protected final Object offerLock = new Object();
        protected int canOfferCount;
        protected int waitOfferCount;
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.canPollCount = 0;
            this.canOfferCount = capacity;
            this.waitPollCount = 0;
            this.waitOfferCount = 0;
            this.head = new Node(null);
            this.tail = head;
        }
    
        // 如果队列已满,通过返回值标识
        public boolean offer(Object obj) throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
            }
            synchronized(offerLock) {
                while(canOfferCount <= 0) {
                    waitOfferCount++;
                    try {
                        offerLock.wait();
                    } catch (InterruptedException e) {
                        // 触发其他线程
                        offerLock.notify();
                        throw e;
    
                    } finally {
                        waitOfferCount--;
                    }
                }
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                canOfferCount--;
            }
            synchronized(pollLock) {
                ++canPollCount;
                if (waitPollCount > 0) {
                    pollLock.notify();
                }
            }
            return true;
        }
    
        // 如果队列为空,阻塞等待
        public Object poll() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            Object result = null;
            synchronized(pollLock) {
                while(canPollCount <= 0) {
                    waitPollCount++;
                    try {
                        pollLock.wait();
                    } catch (InterruptedException e) {
                        pollLock.notify();
                        throw e;
                    } finally {
                        waitPollCount--;
                    }
                }
    
                result = head.next.value;
                head.next.value = 0;
                // ignore head;
                head = head.next;
                canPollCount--;
            }
            synchronized(offerLock) {
                canOfferCount++;
                if (waitOfferCount > 0) {
                    offerLock.notify();
                }
            }
            return result;
        }
        // 省略 Node 的定义
    }

    以上

    1. 当等待线程中断退出时,捕获中断异常,通过 pollLock.notify 和 offerLock.notify 转发消息;
    2. 通过在 finally 中恢复状态追踪变量;

    通过状态变量追踪可以解决读与读之间和写与写之间的锁竞争问题。

    以下考虑如果解决读与读之间和写与写之间的公平性问题。

    6 解决公平性

    公平性的问题的解决需要将状态变量的追踪转换为:请求监视器追踪。

    1. 每个请求对应一个监视器;
    2. 通过内部维护一个 FIFO 队列,实现公平性;
    3. 在队列状态变更时,释放队列中的监视器;

    以上逻辑可以统一抽象为

    boolean needToWait;
    synchronized(this) {
        needToWait = calculateNeedToWait();
        if (needToWait) {
            enqueue(monitor); // 请求对应的monitor
        }
    }
    if (needToWait) {
        monitor.doWait();
    }

    需要注意

    1. monitor.doWait() 需要在 this 的卫式语句之外,因为如果在内部, monitor.doWait 并不会释放 this锁;
    2. calculateNeedToWait() 需要在 this 的守卫之内完成,避免同步问题;
    3. 需要考虑中断异常的问题;

    基于以上的逻辑抽象,实现公平队列

    // 省略接口定义
    class FairnessBoundedBlockingQueue implements Queue {
        // 容量
        protected final int capacity;
    
        // 头指针,empty: head.next == tail == null
        protected Node head;
    
        // 尾指针
        protected Node tail;
    
        // guard: canPollCount, head, pollQueue
        protected final Object pollLock = new Object();
        protected int canPollCount;
    
        // guard: canOfferCount, tail, offerQueue
        protected final Object offerLock = new Object();
        protected int canOfferCount;
    
        protected final WaitQueue pollQueue = new WaitQueue();
        protected final WaitQueue offerQueue = new WaitQueue();
    
        public FairnessBoundedBlockingQueue(int capacity) {
            this.capacity = capacity;
            this.canOfferCount = capacity;
            this.canPollCount = 0;
            this.head = new Node(null);
            this.tail = head;
        }
    
        // 如果队列已满,通过返回值标识
        public boolean offer(Object obj) throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException(); // 线程已中断,直接退出即可,防止中断线程竞争锁
            }
            WaitNode wait = null;
            synchronized(offerLock) {
                // 在有阻塞请求或者队列为空时,阻塞等待
                if (canOfferCount <= 0 || !offerQueue.isEmpty()) {
                    wait = new WaitNode();
                    offerQueue.enq(wait);
                } else {
                    // continue.
                }
            }
    
            try {
                if (wait != null) {
                    wait.doWait();
                }
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
            } catch (InterruptedException e) {
                offerQueue.doNotify();
                throw e;
            }
    
            // 确保此时线程状态正常,以下不会校验中断
            synchronized(offerLock) {
                Node node = new Node(obj);
                tail.next = node;
                tail = node;
                canOfferCount--;
            }
            synchronized(pollLock) {
                ++canPollCount;
                pollQueue.doNotify();
            }
            return true;
        }
    
        // 如果队列为空,阻塞等待
        public Object poll() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new InterruptedException();
            }
            Object result = null;
            WaitNode wait = null;
            synchronized(pollLock) {
                // 在有阻塞请求或者队列为空时,阻塞等待
                if (canPollCount <= 0 || !pollQueue.isEmpty()) {
                    wait = new WaitNode();
                    pollQueue.enq(wait);
                } else {
                    // ignore
                }
            }
    
            try {
                if (wait != null) {
                    wait.doWait();
                }
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
            } catch (InterruptedException e) {
                // 传递消息
                pollQueue.doNotify();
                throw e;
            }
    
            // 以下不会检测线程中断状态
            synchronized(pollLock) {
                result = head.next.value;
                head.next.value = 0;
                // ignore head;
                head = head.next;
                canPollCount--;
            }
    
            synchronized(offerLock) {
                canOfferCount++;
                offerQueue.doNotify();
            }
            return result;
        }
    
        class WaitQueue {
    
            WaitNode head;
            WaitNode tail;
    
            WaitQueue() {
                head = new WaitNode();
                tail = head;
            }
    
            synchronized void doNotify() {
                for(;;) {
                    WaitNode node = deq();
                    if (node == null) {
                        break;
                    } else if (node.doNotify()) {
                        // 此处确保NOTIFY成功
                        break;
                    } else {
                        // ignore, and retry.
                    }
                }
            }
    
            synchronized boolean isEmpty() {
                return head.next == null;
            }
    
            synchronized void enq(WaitNode node) {
                tail.next = node;
                tail = tail.next;
            }
    
            synchronized WaitNode deq() {
                if (head.next == null) {
                    return null;
                }
                WaitNode res = head.next;
                head = head.next;
                if (head.next == null) {
                    tail = head; // 为空,迁移tail节点
                }
                return res;
            }
        }
    
        class WaitNode {
            boolean released;
            WaitNode next;
            WaitNode() {
                released = false;
                next = null;
            }
    
            synchronized void doWait() throws InterruptedException {
                try {
                    while (!released) {
                        wait();
                    }             
                } catch (InterruptedException e) {
                    if (!released) {
                        released = true;
                        throw e;
                    } else {
                        // 如果是NOTIFY之后收到中断的信号,不能抛出异常;需要做RELAY处理
                        Thread.currentThread().interrupt();
                    }
                }
            }
    
            synchronized boolean doNotify() {
                if (!released) {
                    released = true;
                    notify();
                    // 明确释放了一个线程,返回true
                    return true;
                } else {
                    // 没有释放新的线程,返回false
                    return false;
                }
            }
        }
        // 省略 Node 的定义
    }

    以上

    1. 核心是替换状态追踪变量为同步节点, WaitNode;
    2. WaitNode 通过简单的同步队列组织实现 FIFO 协议,每个线程等待各自的 WaitNode 监视器;
    3. WaitNode 内部维持 released 状态,标识线程阻塞状态是否被释放,主要是为了处理中断的问题;
    4. WaitQueue 本身是全同步的,由于已解决了读写竞争已经读写内部竞争的问题, WaitQueue 同步并不会造成问题;
    5. WaitQueue 是无界队列,是一个潜在的问题;但由于其只做同步的追踪,而且追踪的通常是线程,通常并不是问题;
    6. 最终的公平有界队列实现,无论是入队还是出队,首先卫式语句判定是否需要入队等待,如果入队等待,通过公平性协议等待;

    当信号释放时,借助读写锁同步更新队列;最后同样借助读写锁,触发队列更新消息;

    7 等待时间的问题

    并发场景下,等待通常会设置为限时等待 TIMED_WAITING ,避免死锁或损失系统活性;

    实现同步队列的限时等待,并没想象的那么困难

    class TimeoutException extends InterruptedException {}
    
    class WaitNode {
        boolean released;
        WaitNode next;
        WaitNode() {
            released = false;
            next = null;
        }
    
        synchronized void doWait(long milliSeconds) throws InterruptedException {
            try {
                long startTime = System.currentTimeMillis();
                long toWait = milliSeconds;
                for (;;) {
                    wait(toWait);
                    if (released) {
                        return;
                    }
                    long now = System.currentTimeMillis();
                    toWait = toWait - (now - startTime);
                    if (toWait <= 0) {
                        throw new TimeoutException();
                    }
                }
            } catch (InterruptedException e) {
                if (!released) {
                    released = true;
                    throw e;
                } else {
                    // 如果已经释放信号量,此处不抛出异常;但恢复中断状态
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        synchronized boolean doNotify() {
            if (!released) {
                released = true;
                notify();
                return true;
            } else {
                return false;
            }
        }

    由于所有的等待都阻塞在 WaitNode 监视器,以上

    • 首先定义超时异常,此处只是为了方便异常处理,继承 InterruptedException;
    • 此处依赖于 wait(long timeout) 的超时等待实现,这通常不是问题;

    最后,将 WaitNode 超时等待的逻辑,带入到 FairnessBoundedBlockingQueue 实现中,即可。

    四 总结

    本文通过一步步迭代,最终借助 JAVA 同步原语实现初版的公平有界队列。迭代实现过程中可以看到以下几点:

    1. 观念的转变,将调用一个类的方法思维转换为:在满足一定条件下方法才可以调用,在调用前需要满足不变式,调用后满足不变式;由于并发的问题很难测试,通常要采用卫式表达证明并发的正确性;
    2. 在迭代实现中会看到很多模式,比如,读写分离时,其实可以抽象为读锁和写锁;就得到了一个抽象的 Lock 的定义;比如,读写状态追踪,可以采用 Exchanger 抽象表达;
    3. 另外,本文的实现远非完善,还需要考虑支持 Iterator 遍历、状态查询及数据迁移等操作;

    最后,相信大家再看 JUC 的工具包实现,定有不一样的体会。

    原文链接
    本文为阿里云原创内容,未经允许不得转载。

  • 相关阅读:
    如何解压.bz2文件包
    Typecho中的gravatar头像无法加载
    整理的mysql优化内容
    PHP常用的一些正则表达式
    git log --stat常用命令
    Linux查找含有某字符串的所有文件
    linux下如何查看chm文件
    linux访问windows共享文件夹的方法
    typecho除了首页其他大部分网页404怎么办?
    Windows在当前目录打开cmd
  • 原文地址:https://www.cnblogs.com/yunqishequ/p/15556357.html
Copyright © 2011-2022 走看看