zoukankan      html  css  js  c++  java
  • 第一.Lock锁

    1.1Lock简介

    在 Lock 接口出现之前,Java 中的应用程序对于多线程的并发安全处理只能基于synchronized 关键字来解决。但是 synchronized 在有些场景中会存在一些短板,也就是它并不适合于所有的并发场景。但是在 Java5 以后,Lock 的出现可以解决synchronized 在某些场景中的短板,它比 synchronized 更加灵活。

    1.2Lock实现

    Lock 本质上是一个接口,它定义了释放锁和获得锁的抽象方法,定义成接口就意味着它定义了锁的一个标准规范,也同时意味着锁的不同实现。实现 Lock 接口的类有很多,
    ReentrantLock:表示重入锁,它是唯一一个实现了 Lock 接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
    ReentrantReadWriteLock:重入读写锁,它实现了 ReadWriteLock 接口,在这个类中维护了两个锁,一个是 ReadLock,一个是 WriteLock,他们都分别实现了 Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是: 读和读不互斥、读和写互斥、写和写互斥。也就是说涉及到影响数据变化的操作都会存在互斥。

    1.3Lock 的类关系图

    Lock 有很多的锁的实现,但是直观的实现是 ReentrantLock 重入锁
    void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放void lockInterruptibly() // 和lock()方法相似, 但阻塞的线程 可 中 断 , 抛 出
    java.lang.InterruptedException 异常boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成返回 true ,boolean tryLock(longtimeout, TimeUnit timeUnit)//带有超时时间的获取锁方void unlock() // 释放锁 

     1.3ReentrantLock重入锁

    重入锁,表示支持重新进入的锁,同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在内层方法会自动获取锁

    也就是说,线程可以进入任何一个它拥有的锁同步着的代码块。

    如果当前线程 t1 通过调用 lock 方法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized 和 ReentrantLock 都是可重入锁。
    重入锁设计的目的
    比如调用 SendSms方法获得了当前的对象锁,然后在这个方法中再去调用sendEamil,sendEmail中的存在同一个实例锁,这个时候当前线程会因为无法获得demo2 的对象锁而阻塞,就会产生死锁。重入锁的设计目的是避免线程的死锁。
    synchronized 代码案例
    public class ReentrantDemo {
        public synchronized void sendSms(){
            System.out.println(Thread.currentThread().getId()+"	 sendSms"+"bean start");
            sendEmail();
        }
        public synchronized void sendEmail(){
            System.out.println(Thread.currentThread().getId()+"	 sendEmail"+"end start");
        }
        public static void main(String[] args) {
            ReentrantDemo reentrantDemo = new ReentrantDemo();
            new Thread(()->{
                reentrantDemo.sendSms();
            },"t1").start();
            new Thread(()->{
                reentrantDemo.sendSms();
            },"t2").start();
        }
    }
    ReentrantLock 的使用案例
    public class AtomicDemo implements Runnable{
    
        private Lock lock = new ReentrantLock();
    
        @Override
        public void run() {
            get();
        }
        public void get(){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getId()+"	 get()");
                set();
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void set(){
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getId()+"	 set()");
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public static void main(String[] args) {
            AtomicDemo atomicDemo = new AtomicDemo();
           Thread t1 = new Thread(atomicDemo,"t1");
           Thread t2 = new Thread(atomicDemo,"t2");
           t1.start();
           t2.start();
        }
    
    }

    1.4ReentrantReadWriteLock 

    独占锁(写锁)/共享锁(读锁)/互斥锁

    独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁

    共享锁:指该锁可被多个线程所持有。

    对ReentrantReadWriterLock其读锁是共享锁,其写锁是独占锁。

    读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。

    多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享锁资源应该可以同时进行。但是如果有一个线程想去写共享资源来,

    就不应该再有其他线程可以对该资源进行读或者写

    小总结:

        读--读能共存

        读--写不能共存

        写--写不能共存

     多个线程同时读一个资源类没有任何问题,所以为了满足并发量, 读取共享锁资源应该可以同时进行。但是如果有一个线程想去写共享资源来,

    就不应该再有其他线程可以对该资源进行读或者写 即保证数据的一致性,也提升了数据的并发性

    /**
     * 写操作:原子+独占,整个过程必须是一个完整的统一体,中间不许被分割,被打断
     */
    class MyCache{
        private  volatile Map<String,Object> map = new HashMap<String,Object>();
    
        ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
    
        public void put(String key,Object value){
            //写锁
            rw.writeLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"	正在写入:	"+key);
                map.put(key,value);
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+"	写入完成");
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                rw.writeLock().unlock();
            }
        }
        public void get(String key){
            //读锁
            rw.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName()+"正在读取:	"+key);
                Object val = map.get(key);
                Thread.sleep(300);
                System.out.println(Thread.currentThread().getName()+"读取完成:	");
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                rw.readLock().unlock();
            }
        }
    }
    public class ReentrantWriterDemo {
        public static void main(String[] args) {
            MyCache myCache = new MyCache();
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                new Thread(()->{
                    myCache.put(temp+"",temp);
                },String.valueOf(i)).start();
            }
            for (int i = 0; i < 10; i++) {
                final int temp = i;
                new Thread(()->{
                    myCache.get(temp+"");
                },String.valueOf(i)).start();
            }
        }
    }

    在这个案例中,通过 hashmap 来模拟了一个内存缓存,然后使用读写所来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时

    候,读锁不会被阻塞,因为读操作不会影响执行结果。在执行写操作是,线程必须要获取写锁,当已经有线程持有写锁的情况下,当前线程会被阻塞,只有当写锁释放以后,
    其他读写操作才能继续执行。使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性

     1.5AQS

    1.5.1AQS的定义

    在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。

    1.5.2AQS的两种功能

    从使用层面来说,AQS 的功能分为两种:独占和共享
    独占锁,每次只能有一个线程持有锁,比如前面给大家演示的 ReentrantLock 就是以独占方式实现的互斥锁
    共 享 锁 , 允 许 多 个 线 程 同 时 获 取 锁 , 并 发 访 问 共 享 资 源 , 比 如ReentrantReadWriteLock 

    1.5.3AQS的内部实现

    AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。
    所以双向链表可以从任意一个节点开始很方便的访问前驱和后继。每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去;
    当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点(线程)。
     static final class Node {
            /**共享模式下等待标识*/
            static final Node SHARED = new Node();
            /** 指示节点正在以独占模式等待的标记*/
            static final Node EXCLUSIVE = null;
            /** 等待线程以取消 */
            static final int CANCELLED =  1;
            /** 后续线程需要释放 */
            static final int SIGNAL    = -1;
            /** 线程正在等待条件*/
            static final int CONDITION = -2;
            /**
             *waitStatus值,指示下一个acquireShared应该无条件传播
             */
            static final int PROPAGATE = -3;
    
            
            volatile int waitStatus;
    
            /**
             * 前驱节点
             */
            volatile Node prev;
    
            /**
              后继节点
             */
            volatile Node next;
    
            /**
             *当前线程
             */
            volatile Thread thread;
    
            /**
             *存储在condition队列中的后继节点
             */
            Node nextWaiter;
    
            /**
             * 是否为共享锁
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() { 
            }
           //将线程构造成一个Node,添加到等待队列
            Node(Thread thread, Node mode) { 
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { 
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

     1.5.3释放锁以及添加线程对于队列的变化

    AQS中

        /**
         * 等待队列的头,延迟初始化。除初始化外,只能通过setHead方法进行修改。注意:如果head存在,则保证其waitStatus不被取消。
         */
        private transient volatile Node head;
    
        /**
         * 等待队列的尾部,延迟初始化。仅通过方法eng进行修改以添加新的等待节点。
         */
        private transient volatile Node tail;
    
        /**
         * 状态
         */
        private volatile int state;
    当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化,首先看一下添加节点的场景。 

    里会涉及到两个变化

    1. 新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己
    2. 通过 CAS 讲 tail 重新指向新的尾部节点head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,
    如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下

    这个过程也是涉及到两个变化

    1. 修改 head 节点指向下一个获得锁的节点
    2. 新的获得锁的节点,将 prev 的指针指向 null设置 head 节点不需要用 CAS,原因是设置 head 节点是由获得锁的线程来完成
    的,而同步锁只能由一个线程获得,所以不需要 CAS 保证,只需要把 head 节点设置为原首节点的后继节点,并且断开原 head 节点的 next 引用即可

     1.6ReentrantLock 的源码分析

    以 ReentrantLock 作为切入点,来看看在这个场景中是如何使用 AQS 来实现线程的同步的

    1.6.1ReentrantLock的时序图

    调用 ReentrantLock 中的 lock()方法,源码的调用过程我使用了时序图来展现。 

    ReentrantLock.lock()

    这个是ReentrantLock的入口

     public void lock() {
            sync.lock();
        }

    sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中

    ,会继承 AQS 来实现对应场景的功能Sync 有两个具体的实现类,分别是:

    NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁

    FailSync: 表示所有线程严格按照 FIFO 来获取锁
    NofairSync.lock
    以非公平锁为例,来看看 lock 中的实现
    1. 非公平锁和公平锁最大的区别在于,在非公平锁中我抢占锁的逻辑是,不管有
    没有线程排队,我先上来 cas 去抢占一下
    2. CAS 成功,就表示成功获得了锁
    3. CAS 失败,调用 acquire(1)走锁竞争逻辑
      final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }

     CAS实现原理

    protected final boolean compareAndSetState(int 
    expect, int update) {
    // See below for intrinsics setup to support 
    this
    return unsafe.compareAndSwapInt(this, 
    stateOffset, expect, update);
    }

    通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返

    回 false.这个操作是原子的,不会出现线程安全问题,这里面涉及到Unsafe这个类的操作,
    以及涉及到 state 这个属性的意义。state 是 AQS 中的一个属性,它在不同的实现中所表达的含义不一样,对于重入
    锁的实现来说,表示一个同步状态。它有两个含义的表示
    1. 当 state=0 时,表示无锁状态
    2. 当 state>0 时,表示已经有线程获得了锁,也就是 state=1,但是因为ReentrantLock 允许重入,所以同一个线程多次获得同步锁的时候,state 会递增,
    比如重入 5 次,那么 state=5。而在释放锁的时候,同样需要释放 5 次直到 state=0其他线程才有资格获得锁
    Unsafe 类
    Unsafe 类是在 sun.misc 包下,不属于 Java 标准。但是很多 Java 的基础类库,包括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、
    Hadoop、Kafka 等;Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、线程的挂起和恢复、CAS、线程同步、内存屏障而 CAS
    就是 Unsafe 类中提供的一个原子操作,
    第一个参数为需要改变的对象,
    第二个为偏移量(即之前求出来的 headOffset 的值),
    第三个参数为期待的值,
    第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值 var4 相等,则更新为新的期望值 var5,如果更新成功,则返回 true,否则返回 false;
    stateOffset
    一个 Java 对象可以看成是一段内存,每个字段都得按照一定的顺序放在这段内存里,通过这个方法可以准确地告诉你某个字段相对于对象的起始内存地址的字节
    偏移。用于在后面的 compareAndSwapInt 中,去根据偏移量找到对象在内存中的具体位置所以 stateOffset 表示 state 这个字段在 AQS 类的内存中相对于该类首地址的偏移
    AQS.acquire
    acquire 是 AQS 中的方法,如果 CAS 操作未能成功,说明 state 已经不为 0,此时继续 acquire(1)操作 
    这个方法的主要逻辑是
    1. 通过 tryAcquire 尝试获取独占锁,如果成功返回 true,失败返回 false
    2. 如果 tryAcquire 失败,则会通过 addWaiter 方法将当前线程封装成 Node 添加
    到 AQS 队列尾部
    3. acquireQueued,将 Node 作为参数,通过自旋去尝试获取锁。
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
     NonfairSync.tryAcquire
    这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false它是重写 AQS 类中的 tryAcquire 方法,并且大家仔细看一下 AQS 中 tryAcquire
    方法的定义,并没有实现,而是抛出异常。
     protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    ReentrantLock.nofairTryAcquire
    1. 获取当前线程,判断当前的锁的状态
    2. 如果 state=0 表示当前是无锁状态,通过 cas 更新 state 状态的值
    3. 当前线程是属于重入,则增加重入次数
        final boolean nonfairTryAcquire(int acquires) {
              //获取当前线程
                final Thread current = Thread.currentThread();
              //获取state状态
                int c = getState();
               //表示无锁状态
                if (c == 0) {
                   //cas 替换 state 的值,cas 成功表示获取锁成功
                    if (compareAndSetState(0, acquires)) {
                  //保存当前获得锁的线程,下次再来的时候不要再尝试竞争锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                 //如果同一个线程来获得锁,直接增加重入次数
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    AQS.addWaiter
    当 tryAcquire 方法获取锁失败以后,则会先调用 addWaiter 将当前线程封装成Node.
    入参 mode 表示当前节点的状态,传递的参数是 Node.EXCLUSIVE,表示独占状
    态。意味着重入锁用到了 AQS 的独占锁功能
    1. 将当前线程封装成 Node
    2. 当前链表中的 tail 节点是否为空,如果不为空,则通过 cas 操作把当前线程的node 添加到 AQS 队列 
    3. 如果为空或者 cas 失败,调用 enq 将节点添加到 AQS 队列 
       private Node addWaiter(Node mode) {
          //把当前线程封装成Node节点
            Node node = new Node(Thread.currentThread(), mode);
            //tail 是 AQS 中表示同比队列队尾的属性,默认是 null
            Node pred = tail;
         //tail 不为空的情况下,说明队列中存在节点
            if (pred != null) {
             //把当前线程的 Node 的 prev 指向 tail
                node.prev = pred;
              //通过 cas 把 node加入到 AQS 队列,也就是设置为 tail
                if (compareAndSetTail(pred, node)) {
                 //设置成功以后,把原 tail 节点的 next指向当前 node
                    pred.next = node;
                    return node;
                }
            }
         ;//tail=null,把 node 添加到同步队列
            enq(node);
            return node;
        } 
     enq
    enq 就是通过自旋操作把当前节点加入到队列中
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    假设 3 个线程来争抢锁,那么截止到 enq 方法运行结束之后,或者调用 addwaiter方法结束后,AQS 中的链表结构图

    AQS.acquireQueued

    通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给
    acquireQueued 方法,去竞争锁
    1. 获取当前节点的 prev 节点
    2. 如果 prev 节点为 head 节点,那么它就有资格去争抢锁,调用 tryAcquire 抢占锁
    3. 抢占锁成功以后,把获得锁的节点设置为 head,并且移除原来的初始化 head节点
    4. 如果获得锁失败,则根据 waitStatus 决定是否需要挂起线程
    5. 最后,通过 cancelAcquire 取消获得锁的操作
      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                  //获取当前节点的 prev 节点
                    final Node p = node.predecessor();
                   //如果是 head 节点,说明有资格去争抢锁
                    if (p == head && tryAcquire(arg)) {
               ;//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获 
                       得执行权限
                        setHead(node);
                       //把原 head 节点从链表中移除
                        p.next = null; 
                        failed = false;
                        return interrupted;
                    }
    //ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
    //并且返回当前线程在等待过程中有没有中断过。
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

     1.7Condition

    Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒
    Condition的基本使用
    public class ConditionWait implements Runnable {
        private Lock lock;
        private Condition condition;
        public ConditionWait(Lock lock,Condition condition){
            this.lock = lock;
            this.condition = condition;
        }
        @Override
        public void run() {
            System.out.println("begin -ConditionWait");
            try {
                lock.lock();
                condition.await();
                System.out.println("end -ConditionWait");
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    public class ConditionDemoSignal implements Runnable {
        private Condition condition;
        private Lock lock;
    
        ConditionDemoSignal(Lock lock,Condition condition){
            this.lock = lock;
            this.condition = condition;
        }
        @Override
        public void run() {
            System.out.println("begin ConditionDemoSignal");
            try {
                lock.lock();
                condition.signal();
                System.out.println("end ConditionDemoSignal");
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    /**
     * 测试类
     */
    public class Test001 {
        public static void main(String[] args) {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            ConditionWait conditionWait = new ConditionWait(lock,condition);
            ConditionDemoSignal signal = new ConditionDemoSignal(lock,condition);
            Thread t1 = new Thread(conditionWait);
            Thread t2 = new Thread(signal);
            t1.start();
            t2.start();
        }
    }

    题目:多线程之间按顺序调用,实现A→B→C三个线程启动,要求如下:

    AA打印5次,BB打印10次,CC打印15次

    紧接着

    AA打印5次,BB打印10次,CC打印15次

    ....

    打印10轮

    // 判断 干活 通知
    class ShareSource {
        private int number = 1;//1:A 2:B 3:C
        private Lock lock = new ReentrantLock();
        Condition c1 = lock.newCondition();
        Condition c2 = lock.newCondition();
        Condition c3 = lock.newCondition();
        public void print5(){
            lock.lock();
            try {
                while(number!=1){
                    c1.await();
                }
                for (int i = 0; i < 5; i++) {
                    System.out.println(Thread.currentThread().getName()+"	"+i);
                }
                number=2;
                c2.signal();
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void print10(){
            lock.lock();
            try {
                while(number!=2){
                    c2.await();
                }
                for (int i = 0; i < 10; i++) {
                    System.out.println(Thread.currentThread().getName()+"	"+i);
                }
                number=3;
                c3.signal();
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
        public void print15(){
            lock.lock();
            try {
                while(number!=3){
                    c3.await();
                }
                for (int i = 0; i < 15; i++) {
                    System.out.println(Thread.currentThread().getName()+"	"+i);
                }
                number=1;
                c1.signal();
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
     }
    public class ConditionLockDemo {
        public static void main(String[] args) {
            ShareSource shareSource = new ShareSource();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                    shareSource.print5();
                }
            },"AA").start();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                    shareSource.print10();
                }
            },"BB").start();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                    shareSource.print15();
                }
            },"CC").start();
        }
    }
    扩展代码
    public class BatchCondition {
        private Map<String, ReentrantLock> map = new HashMap<>();
        private Map<String, BlockingQueue<Condition>> condidtions = new HashMap<>();
        private int count = 1;
        {
            init();
        }
    
        /**
         * 初始化锁
         */
        public void init(){
            map.put("lock1",new ReentrantLock());
        }
    
        public void task(String key){
            ReentrantLock lock = map.get(key);
            try {
                lock.lock();
                while (!condidtions.isEmpty()){
                    Condition condition = lock.newCondition();
                    condidtions.get(key).add(condition);
                    condition.await();
                }
                count++;
            } catch (Exception ioe) {
                ioe.printStackTrace();
            } finally {
                lock.unlock();
            }
            System.out.println(Thread.currentThread().getName()+"	"+count);
            //解锁
            BlockingQueue<Condition> conditionQueue = condidtions.get(key);
            if(conditionQueue!=null&&!conditionQueue.isEmpty()){
                conditionQueue.poll().signal();
            }
            System.out.println("解锁成功");
        }
    
        public static void main(String[] args) {
            BatchCondition batchCondition = new BatchCondition();
            new Thread(()->{
                batchCondition.task("lock1");
            },"AA").start();
            new Thread(()->{
                batchCondition.task("lock1");
            },"BB").start();
        }
    }
    Condition 源码分析
    调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,先来看 Condition.await 方法
    condition.await调用 Condition 的 await()方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,
    同时线程状态变为等待状态。当从 await()方法返回时,当前线程一定获取了 Condition 相关联的锁 
     public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
        //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
                Node node = addConditionWaiter();
       //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
                long savedState = fullyRelease(node);
                int interruptMode = 0;
    //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
                while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
                    LockSupport.park(this);// 第一次总是 park 自己,开始阻塞等待
    // 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会isOnSyncQueue 中判断自己是否在队列上.
    // isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了就继续阻塞.
    // isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.                
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
    // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
    // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
    // 将这个变量设置成 REINTERRUPT.
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
    // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点. 
    // 如果是 null ,就没有什么好清理的了.
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
    // 如果线程被中断了,需要抛出异常.或者什么都不做
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    addConditionWaiter
    这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表
        private Node addConditionWaiter() {
                Node t = lastWaiter;
                //如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION 时,把冲好 
                  这个节点从链表中移除
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
           //构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比 
               AQS 来说会简单很多
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队列
     
    fullRelease
    就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。
      final long fullyRelease(Node node) {
            boolean failed = true;
            try {
                long savedState = getState();
               //重入次数
                if (release(savedState)) {
                释放锁并且唤醒下一个同步队列中的线程
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    图解分析
    此时,同步队列会触发锁的释放和重新竞争。ThreadB 获得了锁
    isOnSyncQueue
    判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在
    如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起来,直到其他的线程调
    用 signal 唤醒如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限
    为什么要做这个判断呢?原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal的时候,会把当前节点从 condition 队列转移到 AQS 队列
    ➢ 基于现在的逻辑结构。如何去判断
    ThreadA 这个节点是否存在于 AQS 队列中呢?
    1. 如果 ThreadA 的 waitStatus 的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS 队列。因为AQS 队列的状态一定不可能有 CONDITION
    2. 如果 node.prev 为空,说明也不存在于 AQS 队列,原因是 prev=null 在 AQS 队列中只有一种可能性,就是它是head 节点,head 节点意味着它是获得锁的节点。
    3. 如果 node.next 不等于空,说明一定存在于 AQS 队列中,因为只有 AQS 队列才会存在 next 和 prev 的关系
    4. findNodeFromTail,表示从 tail 节点往前扫描 AQS 队列,一旦发现 AQS 队列的节点和当前节点相等,说明节点一定存在于 AQS 队列中 
      final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            return findNodeFromTail(node);
        }
    Condition.signal
    调用 Condition 的 signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
      public final void signal() {
                if (!isHeldExclusively())//先判断当前线程是否获得了锁,直接用获得锁的线程和当前线程相比即可
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
                if (first != null)
                    doSignal(first);
            }
     
    Condition 总结
    线程 awaitThread 先通过 lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程 signalThread 通过 lock.lock()方法获取锁成功后调用
    了 condition.signal 或者 signalAll 方法,使得线程awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取lock,
    从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。
    阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
    释放:signal()后,节点会从 condition 队列移动到 AQS等待队列,则进入正常锁的获取流程

     1.8公平锁和非公平锁

    公平锁和非公平锁

    公平锁:是指多个线程按照申请的顺序获取锁,

    类似排队打饭,先来后到

    非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比现申请的线程优先获取在高并发情况下,有可能造成优先级反转或者饥饿现象

    区别:

    公平锁/非公平锁

    并发包中ReentrantLock创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认非公平锁

    关于两者区别:

    公平锁:就是很公平,在并发情况下,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等等队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。

    非公平锁:比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式

    java ReenTrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的有点在与吞吐量比公平锁打。

    对于synchronize而言,也是一种非公平锁

     1.9自旋锁

     是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处就是减少线程上线文切换的消耗,缺点是循环会消耗cpu

    /**
     * 好处:循环比较获取直到成功为止,没有类似wait的阻塞
     *
     * 通过CAS操作完成自旋锁,A线程先进来调用mylock方法自己持有锁5s钟,B随后进来后发现当前有线程
     * 有锁,不是null,所以只能通过自旋等待锁,直到A释放锁后B随后抢到
     */
    public class SpinLockDemo {
    
        //原子引用线程
        AtomicReference<Thread> atomicReference = new AtomicReference<>();
        public void myLock(){
            Thread thread = Thread.currentThread();
            System.out.println(Thread.currentThread().getName()+"	 common on");
            while (!atomicReference.compareAndSet(null,thread)){
    
            }
        }
        public void myUnLock(){
            Thread thread = Thread.currentThread();
            System.out.println(Thread.currentThread().getName()+"	 common on myUnLock");
            while (!atomicReference.compareAndSet(thread,null)){
    
            }
        }
        public static void main(String[] args) {
            SpinLockDemo spinLockDemo = new SpinLockDemo();
            new Thread(()->{
                spinLockDemo.myLock();
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                spinLockDemo.myUnLock();
            },"AA").start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            new Thread(()->{
                spinLockDemo.myLock();
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                spinLockDemo.myUnLock();
            },"BB").start();
        }
    }

     1.10synchronized和lock的区别

    1.原始构成

                synchronized是关键字属于JVM层面,java关键字,monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象只有在同步块或方法中才能凋           wait/notify 等方法monitorexit)

               lock是具体类(java.util.concurrent.locks.lock)是api层面的锁

    2.使用方法

    synchronized不需要用户去手动释放锁,当synchronize代码执行完后系统会自动让线程释放锁对锁的占用

    ReentrantLock则需要用户手动释放锁若没有主动释放锁就有可能导致出现死锁现象。

    需要lock()和unlock方法配合try/finally语句块来完成。

    3.等待是否中断

        synchronized:不可中断,除非抛出异常或者正常运行完成

         ReentrantLock可中断,1设置超时时间,tryLock(long timeout,TimeUnit nuit)

    4.lockInteeruptibly()放代码块中,调用interrupt()方法可中断

    4.加锁是否公平

            1.synchronizded是非公平锁

            2.ReentrantLock 两者都可以是,默认为非公平,构造方法可以传入boolean值,true为公平锁,false为非公平锁

    5.锁绑定多个条件Condition

     sysnchronizde没有

    ReentranLock用实现分组唤醒需要唤醒的线程们,可以精确的唤醒,而不是像synchronied要么随机唤醒一个线程要么唤醒全部线程。

     
     
  • 相关阅读:
    Linux性能监控
    程序员技术练级攻略
    使用 GDB 调试多进程程序
    nginx下面部署fast-cgi和C++【原】
    ROS Learning-024 (提高篇-002) rviz的安装和使用
    ROS Learning-023 (提高篇-001) 准备工作 --- 安装一些必要的软件包
    STM32 C++编程 005 I2c(Soft)类
    Python 网络爬虫 005 (编程) 如何编写一个可以 下载(或叫:爬取)一个网页 的网络爬虫
    设置 PyCharm 软件中 Terminal 窗口 中启动的 python 交互命令的版本
    在PyCharm 软件中设置你的项目 使用的Python版本
  • 原文地址:https://www.cnblogs.com/cxyyh/p/11566694.html
Copyright © 2011-2022 走看看