zoukankan      html  css  js  c++  java
  • Java编程的逻辑 (71)

    在66节,我们介绍了利用synchronized实现锁,我们提到了synchronized的一些局限性,本节,我们探讨Java并发包中的显式锁,它可以解决synchronized的限制。

    Java并发包中的显式锁接口和类位于包java.util.concurrent.locks下,主要接口和类有:

    • 锁接口Lock,主要实现类是ReentrantLock
    • 读写锁接口ReadWriteLock,主要实现类是ReentrantReadWriteLock

    本节主要介绍接口Lock和实现类ReentrantLock,关于读写锁,我们后续章节介绍。

    接口Lock

    显式锁接口Lock的定义为:

    复制代码
    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    复制代码

    我们解释一下:

    • lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。
    • lockInterruptibly():与lock()的不同是,它可以响应中断,如果被其他线程中断了,抛出InterruptedException。
    • tryLock():只是尝试获取锁,立即返回,不阻塞,如果获取成功,返回true,否则返回false。
    • tryLock(long time, TimeUnit unit) :先尝试获取锁,如果能成功则立即返回true,否则阻塞等待,但等待的最长时间为指定的参数,在等待的同时响应中断,如果发生了中断,抛出InterruptedException,如果在等待的时间内获得了锁,返回true,否则返回false。
    • newCondition:新建一个条件,一个Lock可以关联多个条件,关于条件,我们留待下节介绍。 

    可以看出,相比synchronized,显式锁支持以非阻塞方式获取锁、可以响应中断、可以限时,这使得它灵活的多。

    可重入锁ReentrantLock

    基本用法

    Lock接口的主要实现类是ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:

    • 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁
    • 可以解决竞态条件问题
    • 可以保证内存可见性

    ReentrantLock有两个构造方法:

    public ReentrantLock()
    public ReentrantLock(boolean fair) 

    参数fair表示是否保证公平,不指定的情况下,默认为false,表示不保证公平。所谓公平是指,等待时间最长的线程优先获得锁。保证公平会影响性能,一般也不需要,所以默认不保证,synchronized锁也是不保证公平的,待会我们还会再分析实现细节。

    使用显式锁,一定要记得调用unlock,一般而言,应该将lock之后的代码包装到try语句内,在finally语句内释放锁,比如,使用ReentrantLock实现Counter,代码可以为:

    复制代码
    public class Counter {
        private final Lock lock = new ReentrantLock();
        private volatile int count;
    
        public void incr() {
            lock.lock();
            try {
                count++;
            } finally {
                lock.unlock();
            }
        }
    
        public int getCount() {
            return count;
        }
    }
    复制代码

    使用tryLock避免死锁

    使用tryLock(),可以避免死锁。在持有一个锁,获取另一个锁,获取不到的时候,可以释放已持有的锁,给其他线程机会获取锁,然后再重试获取所有锁。

    我们来看个例子,银行账户之间转账,用类Account表示账户,代码如下:

    复制代码
    public class Account {
        private Lock lock = new ReentrantLock();
        private volatile double money;
    
        public Account(double initialMoney) {
            this.money = initialMoney;
        }
    
        public void add(double money) {
            lock.lock();
            try {
                this.money += money;
            } finally {
                lock.unlock();
            }
        }
    
        public void reduce(double money) {
            lock.lock();
            try {
                this.money -= money;
            } finally {
                lock.unlock();
            }
        }
    
        public double getMoney() {
            return money;
        }
    
        void lock() {
            lock.lock();
        }
    
        void unlock() {
            lock.unlock();
        }
    
        boolean tryLock() {
            return lock.tryLock();
        }
    }
    复制代码

    Account里的money表示当前余额,add/reduce用于修改余额。在账户之间转账,需要两个账户都锁定,如果不使用tryLock,直接使用lock,代码看上去可以这样:

    复制代码
    public class AccountMgr {
        public static class NoEnoughMoneyException extends Exception {}
    
        public static void transfer(Account from, Account to, double money)
                throws NoEnoughMoneyException {
            from.lock();
            try {
                to.lock();
                try {
                    if (from.getMoney() >= money) {
                        from.reduce(money);
                        to.add(money);
                    } else {
                        throw new NoEnoughMoneyException();
                    }
                } finally {
                    to.unlock();
                }
            } finally {
                from.unlock();
            }
        }
    }
    复制代码

    但这么写是有问题的,如果两个账户同时给对方转账,都先获取了第一个锁,则会发生死锁。我们写段代码来模拟这个过程:

    复制代码
    public static void simulateDeadLock() {
        final int accountNum = 10;
        final Account[] accounts = new Account[accountNum];
        final Random rnd = new Random();
        for (int i = 0; i < accountNum; i++) {
            accounts[i] = new Account(rnd.nextInt(10000));
        }
    
        int threadNum = 100;
        Thread[] threads = new Thread[threadNum];
        for (int i = 0; i < threadNum; i++) {
            threads[i] = new Thread() {
                public void run() {
                    int loopNum = 100;
                    for (int k = 0; k < loopNum; k++) {
                        int i = rnd.nextInt(accountNum);
                        int j = rnd.nextInt(accountNum);
                        int money = rnd.nextInt(10);
                        if (i != j) {
                            try {
                                transfer(accounts[i], accounts[j], money);
                            } catch (NoEnoughMoneyException e) {
                            }
                        }
                    }
                }
            };
            threads[i].start();
        }
    }
    复制代码

    以上代码创建了10个账户,100个线程,每个线程执行100次循环,在每次循环中,随机挑选两个账户进行转账。在我的电脑上,每次执行该段代码,都会发生死锁。读者可以更改这些数值进行试验。

    我们使用tryLock来进行修改,先定义一个tryTransfer方法:

    复制代码
    public static boolean tryTransfer(Account from, Account to, double money)
            throws NoEnoughMoneyException {
        if (from.tryLock()) {
            try {
                if (to.tryLock()) {
                    try {
                        if (from.getMoney() >= money) {
                            from.reduce(money);
                            to.add(money);
                        } else {
                            throw new NoEnoughMoneyException();
                        }
                        return true;
                    } finally {
                        to.unlock();
                    }
                }
            } finally {
                from.unlock();
            }
        }
        return false;
    }
    复制代码

    如果两个锁都能够获得,且转账成功,则返回true,否则返回false,不管怎样,结束都会释放所有锁。transfer方法可以循环调用该方法以避免死锁,代码可以为:

    复制代码
    public static void transfer(Account from, Account to, double money)
            throws NoEnoughMoneyException {
        boolean success = false;
        do {
            success = tryTransfer(from, to, money);
            if (!success) {
                Thread.yield();
            }
        } while (!success);
    }
    复制代码

    获取锁信息

    除了实现Lock接口中的方法,ReentrantLock还有一些其他方法,通过它们,可以获取关于锁的一些信息,这些信息可以用于监控和调试目的,比如:

    复制代码
    //锁是否被持有,只要有线程持有就返回true,不一定是当前线程持有
    public boolean isLocked()
    
    //锁是否被当前线程持有
    public boolean isHeldByCurrentThread()
    
    //锁被当前线程持有的数量,0表示不被当前线程持有
    public int getHoldCount()
    
    //锁等待策略是否公平
    public final boolean isFair()
    
    //是否有线程在等待该锁
    public final boolean hasQueuedThreads()
    
    //指定的线程thread是否在等待该锁
    public final boolean hasQueuedThread(Thread thread)
    
    //在等待该锁的线程个数
    public final int getQueueLength()
    复制代码

    实现原理

    ReentrantLock的用法是比较简单的,它是怎么实现的呢?在最底层,它依赖于上节介绍的CAS方法,另外,它依赖于类LockSupport中的一些方法。

    LockSupport

    类LockSupport也位于包java.util.concurrent.locks下,它的基本方法有:

    public static void park()
    public static void parkNanos(long nanos)
    public static void parkUntil(long deadline)
    public static void unpark(Thread thread)

    park使得当前线程放弃CPU,进入等待状态(WAITING),操作系统不再对它进行调度,什么时候再调度呢?有其他线程对它调用了unpark,unpark需要指定一个线程,unpark会使之恢复可运行状态。我们看个例子:

    复制代码
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread (){
            public void run(){
                LockSupport.park();
                System.out.println("exit");
            }
        };
        t.start();    
        Thread.sleep(1000);
        LockSupport.unpark(t);
    }
    复制代码

    线程t启动后调用park,会放弃CPU,主线程睡眠1秒钟后,调用unpark,线程t恢复运行,输出exit。

    park不同于Thread.yield(),yield只是告诉操作系统可以先让其他线程运行,但自己依然是可运行状态,而park会放弃调度资格,使线程进入WAITING状态。

    需要说明的是,park是响应中断的,当有中断发生时,park会返回,线程的中断状态会被设置。另外,还需要说明一下,park可能会无缘无故的返回,程序应该重新检查park等待的条件是否满足。

    park有两个变体:

    • parkNanos:可以指定等待的最长时间,参数是相对于当前时间的纳秒数。
    • parkUntil:可以指定最长等到什么时候,参数是绝对时间,是相对于纪元时的毫秒数。 

    当等待超时的时候,它们也会返回。

    这些park方法还有一些变体,可以指定一个对象,表示是由于该对象进行等待的,以便于调试,通常传递的值是this,这些方法有:

    public static void park(Object blocker)
    public static void parkNanos(Object blocker, long nanos)
    public static void parkUntil(Object blocker, long deadline)

    LockSupport有一个方法,可以返回一个线程的blocker对象:

    public static Object getBlocker(Thread t)

    这些park/unpark方法是怎么实现的呢?与CAS方法一样,它们也调用了Unsafe类中的对应方法,Unsafe类最终调用了操作系统的API,从程序员的角度,我们可以认为LockSupport中的这些方法就是基本操作。

    AQS (AbstractQueuedSynchronizer)

    利用CAS和LockSupport提供的基本方法,就可以用来实现ReentrantLock了。但Java中还有很多其他并发工具,如ReentrantReadWriteLock、Semaphore、CountDownLatch,它们的实现有很多类似的地方,为了复用代码,Java提供了一个抽象类AbstractQueuedSynchronizer,我们简称为AQS,它简化了并发工具的实现。AQS的整体实现比较复杂,我们主要以ReentrantLock的使用为例进行简要介绍。

    AQS封装了一个状态,给子类提供了查询和设置状态的方法:

    private volatile int state;
    protected final int getState()
    protected final void setState(int newState)
    protected final boolean compareAndSetState(int expect, int update) 

    用于实现锁时,AQS可以保存锁的当前持有线程,提供了方法进行查询和设置:

    private transient Thread exclusiveOwnerThread;
    protected final void setExclusiveOwnerThread(Thread t)
    protected final Thread getExclusiveOwnerThread() 

    AQS内部维护了一个等待队列,借助CAS方法实现了无阻塞算法进行更新。

    下面,我们以ReentrantLock的使用为例简要介绍下AQS的原理。

    ReentrantLock

    ReentrantLock内部使用AQS,有三个内部类:

    abstract static class Sync extends AbstractQueuedSynchronizer
    static final class NonfairSync extends Sync
    static final class FairSync extends Sync

    Sync是抽象类,NonfairSync是fair为false时使用的类,FairSync是fair为true时使用的类。ReentrantLock内部有一个Sync成员:

    private final Sync sync;

    在构造方法中sync被赋值,比如:

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    我们来看ReentrantLock中的基本方法lock/unlock的实现,先看lock方法,代码为:

    public void lock() {
        sync.lock();
    }

    NonfairSync的lock代码为:

    复制代码
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    复制代码

    ReentrantLock使用state表示是否被锁和持有数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1)获得锁,acquire是AQS中的方法,代码为:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    它调用tryAcquire获取锁,tryAcquire必须被子类重写,NonfairSync的实现为:

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

    nonfairTryAcquire是sync中实现的,代码为:

    复制代码
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    复制代码

    这段代码应该容易理解,如果未被锁定,则使用CAS进行锁定,否则,如果已被当前线程锁定,则增加锁定次数。

    如果tryAcquire返回false,则AQS会调用:

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

    其中,addWaiter会新建一个节点Node,代表当前线程,然后加入到内部的等待队列中,限于篇幅,具体代码就不列出来了。放入等待队列后,调用acquireQueued尝试获得锁,代码为:

    复制代码
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    复制代码

    主体是一个死循环,在每次循环中,首先检查当前节点是不是第一个等待的节点,如果是且能获得到锁,则将当前节点从等待队列中移除并返回,否则最终调用LockSupport.park放弃CPU,进入等待,被唤醒后,检查是否发生了中断,记录中断标志,在最终方法返回时返回中断标志。如果发生过中断,acquire方法最终会调用selfInterrupt方法设置中断标志位,其代码为:

    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    以上就是lock方法的基本过程,能获得锁就立即获得,否则加入等待队列,被唤醒后检查自己是否是第一个等待的线程,如果是且能获得锁,则返回,否则继续等待,这个过程中如果发生了中断,lock会记录中断标志位,但不会提前返回或抛出异常。

    ReentrantLock的unlock方法的代码为:

    public void unlock() {
        sync.release(1);
    }

    release是AQS中定义的方法,代码为:

    复制代码
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    复制代码

    tryRelease方法会修改状态释放锁,unparkSuccessor会调用LockSupport.unpark将第一个等待的线程唤醒,具体代码就不列举了。

    FairSync和NonfairSync的主要区别是,在获取锁时,即在tryAcquire方法中,如果当前未被锁定,即c==0,FairSync多个一个检查,如下:

    复制代码
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        ...
    复制代码

    这个检查是指,只有不存在其他等待时间更长的线程,它才会尝试获取锁。

    这样保证公平不是很好吗?为什么默认不保证公平呢?保证公平整体性能比较低,低的原因不是这个检查慢,而是会让活跃线程得不到锁,进入等待状态,引起上下文切换,降低了整体的效率,通常情况下,谁先运行关系不大,而且长时间运行,从统计角度而言,虽然不保证公平,也基本是公平的。

    需要说明是,即使fair参数为true,ReentrantLock中不带参数的tryLock方法也是不保证公平的,它不会检查是否有其他等待时间更长的线程,其代码为:

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    ReentrantLock对比synchronized

    相比synchronized,ReentrantLock可以实现与synchronized相同的语义,但还支持以非阻塞方式获取锁、可以响应中断、可以限时等,更为灵活。

    不过,synchronized的使用更为简单,写的代码更少,也更不容易出错。

    synchronized代表一种声明式编程,程序员更多的是表达一种同步声明,由Java系统负责具体实现,程序员不知道其实现细节,显式锁代表一种命令式编程,程序员实现所有细节。

    声明式编程的好处除了简单,还在于性能,在较新版本的JVM上,ReentrantLock和synchronized的性能是接近的,但Java编译器和虚拟机可以不断优化synchronized的实现,比如,自动分析synchronized的使用,对于没有锁竞争的场景,自动省略对锁获取/释放的调用。

    简单总结,能用synchronized就用synchronized,不满足要求,再考虑ReentrantLock。

    小结

    本节主要介绍了显式锁ReentrantLock,介绍了其用法和实现原理,在用法方面,我们重点介绍了使用tryLock避免死锁,在原理上,ReentrantLock使用CAS、LockSupport和AQS,最后,我们比较了ReentrantLock和synchronized,建议优先使用synchronized。

    下一节,我们来看显式条件。

  • 相关阅读:
    关于Manjaro+kde桌面Tim闪退的解决
    Manjaro-kde-18.1.3安装体验
    Ubuntu19.10安装
    OPPO R11刷机初体验
    Microsoft store应用商店打不开0x80131500
    提问回顾与个人总结
    OO第三单元总结
    OO第二单元总结
    软工案例分析作业
    OO第一单元总结
  • 原文地址:https://www.cnblogs.com/ivy-xu/p/12364175.html
Copyright © 2011-2022 走看看