zoukankan      html  css  js  c++  java
  • Lock+Condition实现机制

    前言:大部分多线程同步场景,在功能和性能层面,synchronized可以满足,少部分场景Lock可以满足,dubbo的源码也符合这个比例,需要使用到Condition的场景极少,整个dubbo源码中只在启动函数中,服务关闭这一处使用到了Lock+Condition机制。

    1.Lock+Condition用法

    生产者,消费者模式在面试coding中出场率很高,可以用synchronized+wait+ notify来实现,也可以使用Lock+Condition实现。直接上代码

    public class LockConditionTest {
        private LinkedList<String> queue=new LinkedList<String>();
    
        private Lock lock = new ReentrantLock();
    
        private int maxSize = 5;
    
        private Condition providerCondition = lock.newCondition();
    
        private Condition consumerCondition = lock.newCondition();
    
        public void provide(String value){
            try {
                lock.lock();
                while (queue.size() == maxSize) {
                    providerCondition.await();
                }
                queue.add(value);
                consumerCondition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public String consume(){
            String result = null;
            try {
                lock.lock();
                while (queue.size() == 0) {
                    consumerCondition.await();
                }
                result = queue.poll();
                providerCondition.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return result;
        }
    
        public static void main(String[] args) {
            LockConditionTest t = new LockConditionTest();
            new Thread(new Provider(t)).start();
            new Thread(new Consumer(t)).start();
    
        }
    
    }
    

    以两个问题驱动
    1.队列满了,生产者线程怎么停下来的?队列从满又变为不满的时候,怎么重新激活。
    2.队列空了,消费者线程如何停下来,又如何重新开始消费。
    一步一步解答这两个问题

    2.ReentrantLock

    ReentrantLock初始化的时候,默认是初始化一个NonfairSync。

    public ReentrantLock() {
            sync = new NonfairSync();
        }
    

      

     
    NonfairSync类图

    核心代码在AbstractQueuedSynchronizer中,只看数据结构的话,这是一个基于Node,带头指针和尾指针的双向链表,每一个Node里面存一个线程,以及该线程的等待状态

    static final class Node {
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
     }
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
    

      

    那么,ReentrantLock在lock和unlock的时候,操作的就是这个双向链表sync queue。
    先看获取锁的过程

    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

      

    1.如果这个锁没有任何线程持有,那么当前线程直接可以获得。(这是非公平锁的设计,如果是公平锁,需要检查是否有线程在排队,如果有,当前线程不能直接抢占,也要加入排队。)
    2.如果这个锁被占用了,占用线程是当前线程,那么state+1,这也是可重入锁之所以可以重入的由来。
    3.都不满足,暂时获取锁失败,返回false

    那么如果在3这一步获取锁失败,后续如何处理呢?

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

      

    1.addWaiter:在等待队列sync queue中添加一个节点
    2.acquireQueued:节点自旋获取锁或者进入阻塞

    addWaiter比较简单,即把当前等待线程加入sync queue的尾节点。

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

      

    acquireQueued具体做了什么呢?

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      

    1.自旋
    2.如果当前就一个线程在等待,那么尝试获取锁。(判断条件:当前节点的前驱为head,即head.next = 当前节点)
    3.不满足2,如果满足进入阻塞的条件,调用LockSupport.park(this)进入阻塞。

    一句话总结lock的过程:当前线程直接去尝试获取锁,不成功,则加入sync queue尾节点进行阻塞等待(非公平)。

    在看unlock的过程

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

      

    1.先释放当前线程占有的锁,核心就是维护state的值。加一次锁,state+1,释放反之。
    2.unparkSuccessor :之前提到,lock的时候,会维护一个排队的双向队列sync queue,此时,会unpark唤醒其中的head.next,使其进入锁竞争状态。

    注:公平锁,非公平锁加锁的过程小有区别,一个是抢占式的,一个是排队式的,释放锁的过程则是完全相同的。

    3.Condition

    先看类图


     
    Condition

    用过Object的wait,notify的对这些方法应该不陌生,对应这里的await和signal
    先看await

    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

      

    1.构造一个Node,形成一个单向链表condition queue,存储用于await在某一个condition上的线程。
    2.释放当前Node持有的锁。这个释放过程跟之前提到的unlock过程类似。
    3.LockSupport.park进行阻塞,等待signal的唤醒。
    4.如果被唤醒,继续加入锁的竞争中去。

    在看signal

    public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

      

    在某个condition进行await的时候,形成了一个单向链表condition queue。
    那么在signal的时候,先对头结点firstWaiter进行唤醒。
    唤醒的过程见下

    final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

      

    1.将这个头结点,从condition queue中移到之前提到的sync queue中。
    2.LockSupport.unpark唤醒这个节点中的线程,进行锁争夺。

    4 总结

    ReentrantLock lock依赖的是一个双向链表sync queue
    condition依赖的是一个单项链表condition queue
    二者的阻塞和唤醒依赖的都是LockSupport的park和unpark方法。

    公平锁与非公平锁的区别就在于获取锁的方式不同,公平锁获取,当前线程必须检查sync queue里面是否已经有排队线程。而非公平锁则不用考虑这一点,当前线程可以直接去获取锁。

    开篇实现生产者消费者模型的时候,有两个问题,现在有答案了
    1.队列满了,生产者线程怎么停下来的?队列从满又变为不满的时候,怎么重新激活。
    ---通过lock机制,保证同一时刻,只有一个线程获取到锁,要么生产,要么消费,队列满了之后,生产者线程调用providerCondition.await(),进入阻塞等待状态,使得生产者线程停下来。当消费线程消费的时候,调用 providerCondition.signal(),重新激活生产者线程。

    2.队列空了,消费者线程如何停下来,又如何重新开始消费。
    ---与第一个问题同理。



    作者:北交吴志炜
    链接:https://www.jianshu.com/p/b60273eb71a9
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

  • 相关阅读:
    三个习题
    20 python--celery
    19 python --队列
    18 python --多线程
    17 python --多进程
    16 python --memcached
    15 python --redis
    14 python --mysql
    13 python --正则
    12 python --json
  • 原文地址:https://www.cnblogs.com/felixzh/p/11995958.html
Copyright © 2011-2022 走看看