zoukankan      html  css  js  c++  java
  • 《Java并发编程的艺术》第5章 Java中的锁 (上)

    目录

    综述

    介绍Java并发包中与锁相关的API和组件。

    内容主要围绕两个方面:

    • 使用:组件和相关API的使用方法

    • 实现:通过分析源码剖析实现细节

    Lock接口

    1.什么是Lock接口

    锁是控制多个线程访问共享资源的方式。

    在Lock接口出现之前,Java程序通过synchronized关键字实现锁功能。

    Java SE 5之后,并发包中添加了Lock接口及相关实现类实现锁功能,并提供了synchronized关键字不具备的特性。

    2.与synchronized的区别

    Lock接口提供的synchronized关键字不具体的特性

    特性

    说明

    非阻塞地获取锁(tryLock()方法)

    当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁

    能被中断地获取锁(lockInterruptibly()方法)

    与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将被抛出,同时锁会被释放

    超时获取锁(tryLock(long time, TimeUnit unit)方法)

    在指定的截止时间之前获取锁,如果时间到了仍没有获取锁,直接返回

    3.Lock接口源码

    public interface Lock {
        //获取锁,调用该方法当前线程将会获得锁,当锁获得后,从该方法返回
        void lock();
        //可中断地获得锁,与Lock方法不同之处:会响应中断,即在锁获取的过程中可以中断当前线程
        void lockInterruptibly() throws InterruptedException;
        //尝试非阻塞的获得锁,调用该方法会立刻返回。如果锁获取成功,返回true;否则,返回false
        boolean tryLock();
        //超时获取锁,当前线程在以下3中情况时会返回
        //1.当前线程在超时时间内获得了锁
        //2.当前线程在超时时间内被中断
        //3.超时时间结束,返回false
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        //释放锁
        void unlock();
        //获取等待通知组件,该组件和当前的锁绑定,当前线程只有先获取锁,才能调用该组件的wait()方法,调用后,当前线程将释放锁
        Condition newCondition();
    }
     

    4.如何使用,以及注意事项

    Lock lock = new ReetrantLock();
    lock.lock();
    try {
      //业务逻辑
    } finally {
      lock.unlock();
    }

    使用注意点:

    1. 在finally中释放锁,目的是保证获取锁之后,最终能够被释放

    2. 不要将获取锁的过程写在try块中,??????????????

    可重入锁ReentrantLock

    1.可重入锁ReentrantLock

    什么是可重入锁?

    可重入锁,就是支持重进入的锁,它表示该锁支持一个线程对资源重复加锁。

    举例

    • 可重入:A线程当前持有锁lock,A线程再次要求获取锁lock时,A线程可再次获取锁成功。

    • 不可重入:A线程当前持有锁lock,A线程再次要求获取锁lock时,A线程此次获取锁的操作将被阻塞。

    除了可重入特性,该锁还支持获取锁的公平和非公平性选择。

    什么是公平锁,非公平锁?

    如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是非公平的。

    举例,线程A在t1时刻请求获得锁,线程B在t2时刻请求获取锁,且t1 < t2,如果为公平锁,则一定是线程A先获得锁,然后B先获得锁。

    2.如何使用

    Lock lock = new ReetrantLock();//默认为非公平锁
    //Lock lock = new ReetrantLock(true); //公平锁
    lock.lock();
    try {
      //业务逻辑
    } finally {
      lock.unlock();
    }
     

    公平锁与非公平锁测试对比

    公平锁与非公平锁相比,总耗时是94.3倍,总切换次数是133倍。

    • 公平锁保证了锁的获取按照FIFO原则,但代价是大量的线程切换。

    • 非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。

    3.实现原理(结合队列同步器AQS)

    3.1 可重入实现原理

    3.2 公平性实现原理

    读写锁ReadWriteLock

    1.什么是读写锁

    ReentrantLock是排他锁,在同一个时刻仅允许一个线程访问资源。

    读写锁

    • 读写锁在同一个时刻刻允许多个读线程访问。

    • 当写线程访问时,所有的读线程和其他写线程均被阻塞。

    读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比排它锁有了很大提升。

    2.如何使用

    一般情况下,读写锁的性能都比排它锁要好,因为大多数场景都是读多于写的。

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    ​
    public class Cache {
        static Map<String, Object> map = new HashMap<String, Object>();
        static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock rLock = rwl.readLock();
        static Lock wLock = rwl.writeLock();
        //获取一个key对应的value
        public static final Object get(String key) {
            rLock.lock();
            try {
                return map.get(key);
            } finally {
                rLock.unlock();
            }
        }
        //设置key对应的value,并返回旧的value
        public static final Object put(String key, Object value) {
            wLock.lock();
            try {
                return map.put(key, value);
            } finally {
                wLock.lock();
            }
        }
        //清空所有内容
        public static final void clear() {
            wLock.lock();
            try {
                map.clear();
            } finally {
                wLock.unlock();
            }
        }
    }

    2.实现原理(结合队列同步器AQS)

    Condition接口

    1.什么是Condition

    任何一个Java对象,都拥有一组监视器方法(定义在java.lang.Object)上,

    主要包括wait()、wait(long timeout)、notify()及notifyAll(),这些方法与synchronized关键字配置,可以实现等待/通知模式。

    Conditon接口也提供了类似监视器方法,与Lock接口配合实现等待/通知模式。

    对比项

    Object提供的监视器方法

    Condition

    前置条件

    获取对象的锁

    调用lock.lock()获得锁

    调用lock.newCondition()获取condition对象

    等待队列个数

    一个

    多个

    当前线程释放锁并进入等待状态

    object.wait()

    condition.await()

    当前线程释放锁并进入等待状态,在等待状态中不响应中断

    不支持

    condition.awaitUninterruptibly()

    当前线程释放锁并进入等待状态,

    并进入超时状态

    object.wait(long timeout)

    condition.await()

    当前线程释放锁并进入等待状态,

    知道将来的某个时间

    不支持

    contition.awaitUntil(Date deadline)

    唤醒等待队列中的一个线程

    支持 (object.notify())

    支持 (condition.signal())

    唤醒等待队列中的全部线程

    支持 (object.notifyAll())

    支持 (condition.signalAll())

    2.如何使用

    public class BoundedQueueV1<T> {
        private Object[] items;
        private int addIndex, removeIndex, count;
        private Object notEmpty = new Object();
        private Object notFull = new Object();
        public BoundedQueueV1(int size) {
            this.items = new Object[size];
        }
    ​
        //添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
        public synchronized void  add(T t) throws InterruptedException {
            while (count == items.length) {
                notFull.wait();
            }
            items[addIndex] = t;
            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.notify();
        }
    ​
        //从头部删除一个元素,如果数组为空,则删除线程进入等待状态,直到有新元素添加
        public synchronized T remove() throws InterruptedException {
            while (count == 0) {
                notEmpty.wait();
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.notify();
            return (T)x;
        }
    }
    public class BoundedQueue<T> {
        private Object[] items;
        private int addIndex, removeIndex, count;
        private Lock lock = new ReentrantLock();
        private Condition notEmpty = lock.newCondition();
        private Condition notFull = lock.newCondition();
        public BoundedQueue(int size) {
            this.items = new Object[size];
        }
        //添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
        public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    notFull.await();
                }
                items[addIndex] = t;
                if (++addIndex == items.length) {
                    addIndex = 0;
                }
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    ​
        //从头部删除一个元素,如果数组为空,则删除线程进入等待状态,直到有新元素添加
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    notEmpty.await();
                }
                Object x = items[removeIndex];
                if (++removeIndex == items.length) {
                    removeIndex = 0;
                }
                --count;
                notFull.signal();
                return (T)x;
            } finally {
                lock.unlock();
            }
        }
    }

    3.实现原理(结合AQS)

    LockSupport工具

    1.LockSupport工具的作用

    LockSupport是构建同步组件的基础工具。

    其定义了一组公共静态方法,提供了最基本的线程阻塞和唤醒功能。

    方法名称

    说明

    void park()

    阻塞当前线程,如果调用unpark(Thread thread)方法或者当前线程被中断,才能从park()方法返回

    void parkNanos(long nanos)

    阻塞当前线程,最长不超过nanos纳秒

    void pardUntil(long deadline)

    阻塞当前线程,知道deadline(从1970年开始到deadline的毫秒数)

    vodi unpark(Thread thread)

    唤醒处于阻塞状态的线程

    2.LockSupport源码

    LockSupport部分源码,其内部是借助UNSAFE类实现的。

    public class LockSupport {
        private LockSupport() {} // Cannot be instantiated.
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    ​
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
    ​
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                setBlocker(t, blocker);
                UNSAFE.park(false, nanos);
                setBlocker(t, null);
            }
        }
    ​
        public static void parkUntil(Object blocker, long deadline) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(true, deadline);
            setBlocker(t, null);
        }
    ​
        public static void park() {
            UNSAFE.park(false, 0L);
        }
    ​
        public static void parkNanos(long nanos) {
            if (nanos > 0)
                UNSAFE.park(false, nanos);
        }
    ​
        public static void parkUntil(long deadline) {
            UNSAFE.park(true, deadline);
        }
    }
     

    队列同步器AQS

    1.队列同步器AQS是什么,是做什么用的

    队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架。

    同步器是实现锁或其他任意同步组件的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

    • 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问)

    • 同步器面向的是锁的实现者,简化了锁的实现方法,屏蔽了同步状态管理、线程排队、等待与唤醒等底层细节。

    利用AQS实现的同步组件:

    • 可重入锁ReentrantLock

    • 读写锁ReentrantReadWriteLock

    • 信号量Semaphore

    • ............

    2.队列同步器实现线程同步的原理

    队列同步器是基于模板方法模式设计。

    同步状态

    相关方法:getState()、setState(int newState)、compareAndSetState()

    同步器可重写的方法:使用同步状态相关方法访问或修改同步状态

    相关方法:tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()等方法

    同步器提供的模板方法:内部调用了重写方法

    相关方法:acquire()、release()、acquiredShared()、releaseShared()等方法

    public abstract class AbstractQueuedSynchronizer  extends AbstractOwnableSynchronizer
            implements java.io.Serializable {
    ​
        //---------同步状态---------------//
        /**
         * 同步状态
         */
        private volatile int state;
    ​
        protected final int getState() {
            return state;
        }
    ​
        protected final void setState(int newState) {
            state = newState;
        }
    ​
        protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    ​
        //-----------同步器可重写的方法----------------//
        /**
         * 独占式获取和释放同步状态
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    ​
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    ​
        /**
         * 共享式获取和释放同步状态
         */
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    ​
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    ​
        /**
         * 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
         */
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    ​
        //-------------同步器提供的模板方法---------------------//
        /**
         * 同步器提供的模板方法可分为3类:
         * 1.独占式获取与释放同步状态
         * 2.共享式获取与释放同步状态
         * 3.查询同步队列中的等待线程情况
         *//**
         * 1.独占式获取与释放同步状态
         */
        /**
         * 独占式获取同步状态。
         * 如果当前线程获取同步状态成功,则方法返回;否则,将会进入同步队列等待
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    ​
        /**
         * 独占式获取同步状态,可响应中断。
         * 当前线程未获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法抛出InterruptedException异常。
         */
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    ​
        /**
         * 在acquireInterruptibly(int arg)基础上添加了超时限制
         */
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                    doAcquireNanos(arg, nanosTimeout);
        }
    ​
        /**
         * 独占式释放同步状态。
         * 该方法释放同步状态之后,将同步队列中的第一个节点中的线程唤醒
         */
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                java.util.concurrent.locks.AbstractQueuedSynchronizer.Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    ​
        /**
         * 2.共享式获取与释放同步状态
         *//**
         * 与独占式的区别是同一时刻可以有多个线程获取到同步状态
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    ​
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    ​
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquireShared(arg) >= 0 ||
                    doAcquireSharedNanos(arg, nanosTimeout);
        }
    ​
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    ​
        /**
         * 3.查询同步队列中的等待线程情况
         */
        public final boolean hasQueuedThreads() {
            return head != tail;
        }
    ​
        public final Collection<Thread> getQueuedThreads() {
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (java.util.concurrent.locks.AbstractQueuedSynchronizer.Node p = tail; p != null; p = p.prev) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
            return list;
        }
    }
     

    3.实现自定义同步组件:独占锁Mutex

    public class Mutex implements Lock, java.io.Serializable{
    ​
        private static class Sync extends AbstractQueuedSynchronizer {
    ​
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    ​
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    ​
            @Override
            protected boolean tryRelease(int arg) {
                if (getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    ​
            Condition newCondition() {
                return new ConditionObject();
            }
    ​
        }
    ​
        private final Sync sync = new Sync();
    ​
        public void lock() {
            sync.acquire(1);
        }
    ​
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    ​
        public void unlock() {
            sync.release(1);
        }
    ​
        public Condition newCondition() {
            return sync.newCondition();
        }
    ​
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    ​
        public boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    ​
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    ​
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    }

    4.独占式同步状态获取与释放

    同步队列

    没有成功获取同步状态线程会放入队列的尾部。

    独占式同步状态获取

    //1.tryAcquire:获取同步状态(tryAcquire需要在自定义同步组件时重写)
        //2.addWaiter:获取同步状态失败,添加到队列尾部,
        //3.acquiredQueued:自旋尝试获取同步状态,如果自己为第一个节点且获取同步状态成功,将自己设置为head节点
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    ​
        //由于只有一个线程可成功获取同步状态,可能同时有多个线程获取失败,因此通过CAS保证线程安全的添加尾节点
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    ​
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    ​
        //
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
     

    addWait方法,获取失败加入到同步队列尾部

    加入到队列中,自旋重试获取同步状态

    第一个节点获取同步状态成功,将自己设置为head

    独占式同步状态释放

        //释放同步状态,会唤醒后续节点,进而使后续节点重新重试获取同步状态。
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    ​
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
      
  • 相关阅读:
    1.Go环境安装
    IDEA启动Tomcat控制台中文显示乱码
    专注的含义
    翻出一封古老的信
    若有所思
    B+Tree与B-Tree 的区别
    Redis集群 什么是slots
    夜深人静,听雨
    随心所想
    本性可以改吗
  • 原文地址:https://www.cnblogs.com/yeyang/p/12580896.html
Copyright © 2011-2022 走看看