zoukankan      html  css  js  c++  java
  • AQS源码

    1.三种线程调度方式

    1. wait()/notify()/notifyAll()

      public static void main(String[] args) {
              Object obj = new Object();
              new Thread(()->{
                  synchronized (obj){
                      System.out.println("线程[{}]"+Thread.currentThread().getName()+"获取到锁,并且暂停");
                      try {
                          obj.wait();
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                      System.out.println("线程[{}]"+Thread.currentThread().getName()+"获取到锁,继续执行");
                  }
              },"thread-1").start();
              new Thread(()->{
                  try {
                      //暂停1秒,保证线程1能拿到锁
                      Thread.sleep(2000);
                     synchronized (obj){
                         System.out.println("线程[{}]"+Thread.currentThread().getName()+"获取到锁,并唤醒等待的线程");
                         //唤醒等待的线程1
                         obj.notify();
                     }
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              },"thread-2").start();
          }
      
    2. await()/signal()/signalAll()

      public static void main(String[] args) {
              Lock lock = new ReentrantLock();
              Condition condition = lock.newCondition();
              new Thread(()->{
                  try {
                      lock.lock();
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]获取到锁,并执行条件等待");
                      condition.await();
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]取消条件等待,继续执行");
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } finally {
                      lock.unlock();
                  }
              },"thread-1").start();
              new Thread(()->{
                  try {
                      Thread.sleep(1000);
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]暂停1秒");
                      lock.lock();
                      condition.signal();
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]解除条件等待,唤醒等待线程");
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      lock.unlock();
                  }
              },"thread-2").start();
          }
      
    3. LockSupport.park()/LockSupport.unpark()

      public static void main(String[] args) {
              Object obj = new Object();
              Thread t1 = new Thread(()->{
                  try {
                      Thread.sleep(2000);
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]进入执行");
                      LockSupport.park(obj);
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]继续执行");
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              },"thread-1");
              Thread t2 = new Thread(()->{
                  try {
                      Thread.sleep(1000);
                      System.out.println("线程[{"+Thread.currentThread().getName()+"}]暂停1s,并执行unpark");
                      LockSupport.unpark(t1);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              },"thread-2");
      
              t1.start();
              t2.start();
      

      用来创建锁和其他同步类的基本线程阻塞原语。类似于信号量Semaphore,但是只有一个许可,且可以先释放再获取许可。

    2.CAS

    全称Compare And Swap,比较并交换。

    通过sun.misc.unsafe类实现,主要参数有三个,分别是内存值、期望值以及更新值,只有内存值和期望值一致时才会将内存值修改为更新值。

    例如JUC包下的atomicXXX等类均通过CAS的方式实现。

    3.volitale

    • Java内存模型(JMM)

      graph TD A(线程A)-->B(本地内存A) C(线程B)-->D(本地内存B) B --> |共享变量副本A| F(主内存&&共享变量) D --> |共享变量副本B| F
    • volitale

      • 共享变量可见性
      • 防止指令重排

    4.AQS

    4.1 简介

    AQS全称是AbstractQueuedSynchronizer,抽象的队列同步器,提供了一个实现阻塞锁(ReetrentLock)或者相关的同步器(semaphorecountDownLatch)的框架,这些锁或者同步器都依赖于一个FIFO的等待队列,并且可以通过一个int类型的变量来表示锁或者同步器的状态。

    AQS定义了2种资源共享模式:

    • 独占模式(Exclusive):独占,只有一个线程能执行,如ReentrantLock
    • 共享模式(Share):共享,多个线程可同时执行,如ReadWriteLockSemaphore

    4.2 源码分析

    4.2.1 state

    /**
    * 同步状态
    */
    private volatile int state;
    

    表示锁或者同步器的状态,不同的锁或者同步器对于state的值可能都不同,且各个值表示的含义也不同。

    state使用volatile关键字修饰,能够保证state的可见性,state的访问方式有三种:

    protected final int getState() {
    	return state;
    }
    
    protected final void setState(int newState) {
    	state = newState;
    }
    
    protected final boolean compareAndSetState(int expect, int update) {
    	return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    state不同的值表示不同的锁状态,通常使用cas的方式设置state的值,

    比如ReetrantLockstate>0表示锁已经被抢占,其他线程只能进入队列等待唤醒,state=0表示占用锁的线程已经释放锁了,其他线程可以进行抢占。

    再比如Semephorestate的初始值就是设置的许可数量,每使用一个许可就将state值减1,当state=0时表示许可用完,后续线程就需要排队。

    4.2.2 Node

    竞争资源失败的线程需要进入等待队列(CLH队列),在进入等待队列之前被封装成一个个的NodeNode主要包含以下几个属性:

    /** 表示节点处于共享模式 */
    static final Node SHARED = new Node();
    /** 表示节点处于独占模式 */
    static final Node EXCLUSIVE = null;
    /**节点状态*/
    volatile int waitStatus;
    /**前置节点*/
    volatile Node prev;
    /**后置节点*/
    volatile Node next;
    /**当前线程*/
    volatile Thread thread;
    /**条件队列中的下一个节点*/
    Node nextWaiter;
    

    其中表示节点状态的waitStatus属性的值为以下几种,新创建的节点waitstatus的值是0.

    /** 表示线程已经被取消了(timeout或者被中断) */
    static final int CANCELLED =  1;
    /** 表示后置节点在等待前置节点唤醒,后继节点入队时,会将前置节点的状态修改为 SIGNAL*/
    static final int SIGNAL    = -1;
    /** 表示线程正在等待condition上,当其他线程调用了Condition.signal()方法时,线程会从条件队列转移到等待队列,等待获取同步锁 */
    static final int CONDITION = -2;
    /**
     * 共享模式下,前置节点不仅要唤醒后继节点,还要唤醒后继节点的后继节点
     */
    static final int PROPAGATE = -3;
    

    waitStatus<0时表示线程还在有效等待,但是waitStatus>0时表示timeout或者线程中断,因此源码中很多地方使用>0或者<0的方式进行判断,并没有详细到每一个值的判断。

    4.2.3 CLH队列

    4.2.3.1 原始CLH队列

    CLH队列全称是Craig, Landin, Hagersten lock queue,是一个基于(隐式)单向链表的可扩展、高性能、公平的自旋锁,能够保证无饥饿,并且先到先得的公平性,通过在后继节点中自旋前继节点中的属性值来实现。

    代码示例:

    /**
    * 定义Node
    */
    public class QNode {
        volatile boolean locked;
    }
    
    /**
    * 定义Lock接口
    */
    public interface Lock {
     
        void lock();
     
        void unlock();
    }
    
    /**
    * 定义CLH锁
    */
    public class CLHLock implements Lock {
        // 尾节点,是所有线程共有的一个。所有线程进来后,把自己设置为tail
        private final AtomicReference<QNode> tail;
        // 前驱节点,每个线程独有一个。
        private final ThreadLocal<QNode> myPred;
        // 当前节点,表示自己,每个线程独有一个。
        private final ThreadLocal<QNode> myNode;
    
        public CLHLock() {
            this.tail = new AtomicReference<>(new QNode());
            this.myNode = ThreadLocal.withInitial(QNode::new);
            this.myPred = new ThreadLocal<>();
        }
    
        @Override
        public void lock() {
            // 获取当前线程的代表节点
            QNode node = myNode.get();
            // 将自己的状态设置为true表示获取锁。
            node.locked = true;
            // 将自己放在队列的尾巴,并且返回以前的值。第一次进将获取构造函数中的那个new QNode
            QNode pred = tail.getAndSet(node);
            // 把旧的节点放入前驱节点。
            myPred.set(pred);
            // 在等待前驱节点的locked域变为false,这是一个自旋等待的过程
            while (pred.locked) {
            }
            // 打印myNode、myPred的信息
            peekNodeInfo();
        }
    
        @Override
        public void unlock() {
            // unlock. 获取自己的node。把自己的locked设置为false。
            QNode node = myNode.get();
            node.locked = false;
            myNode.set(myPred.get());
        }
    }
    
    /**
    * 使用场景
    */
    public class KFC {
        private final Lock lock = new CLHLock();
        private int i = 0;
    
        public void takeout() {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + ": 拿了第" + ++i + "份外卖");
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    
    public static void main(String[] args) {
        final KFC kfc = new KFC();
        Executor executor = Executors.newFixedThreadPool(5);
        for (int i = 1; i <= 35; i++) {
            executor.execute(kfc::takeout);
        }
    }
    

    4.2.3.2 变体CLH队列

    AQS中使用的CLH队列是原始CLH队列的变体,变体的CLH队列借鉴了CLH队列的思想,即在前置节点中保存一些线程的信息,使用Node节点中的status来追踪线程是否应该被阻塞,但是status的值不能用来控制线程是否应该被锁定。

    对于非公平锁,一个线程处于队列的头结点处,但是并不一定能获取锁成功,处于头节点处只是给了一个竞争的权利,如果竞争失败,还是要继续等待。

    graph LR A(HEAD)--> B(傀儡节点 wt=-1) B --> |next| C(Thread A wt=-1) C --> |prev| B C --> D(Thread B wt=-1) D --> C D --> E(Thread C wt=-1) E --> D F(TAIL) --> E

    4.2.4 流程图

    4.2.5 源码分析

    ReentrantLock为例,ReentrantLock是独占锁,且分为公平锁和非公平锁,本次分析以非公平锁为例。

    4.2.5.0 加锁过程分析

    lock()过程

    public class ReentrantLock implements Lock, java.io.Serializable {
       
        //构造方法,默认非公平
        public ReentrantLock() {sync = new NonfairSync();}
        public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
        
        //继承AQS
        private final Sync sync;
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
            
            abstract void lock(); // <2>
    
            //尝试获取锁
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
            //释放锁
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            //通过state判断状态
            final boolean isLocked() {return getState() != 0;}
        }
    
       //非公平锁
        static final class NonfairSync extends Sync {
            final void lock() { // <3>
                //cas的方式修改state的值
                if (compareAndSetState(0, 1))
                    //cas修改成功,设置当前线程为独占线程
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    //cas修改失败
                    acquire(1); //<4>
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
        //公平锁
        static final class FairSync extends Sync {
            final void lock() {acquire(1);}
    
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
        public void lock() {sync.lock();}   // <1>
        public boolean tryLock() {return sync.nonfairTryAcquire(1);}
        public void unlock() {sync.release(1);}
    }
    

    当出现竞争时,调用AQS.acquire(1)方法

    public final void acquire(int arg) {
    	if (!tryAcquire(arg) && //尝试竞争锁
    		acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //加入队列
    		selfInterrupt(); //线程中断
    }
    

    先看AQS中的tryAcquire方法

    // AbstractQueuedSynchronizer
    protected boolean tryAcquire(int arg) {
    	throw new UnsupportedOperationException();
    }
    

    是一个由protected修饰的方法,并且直接抛出异常,这样的方法可以由子类重载,实现具体的逻辑,ReentrantLock.NonfairSync中的重载方法如下

    //NonfairSync
    protected final boolean tryAcquire(int acquires) {
    	return nonfairTryAcquire(acquires);
    }
    //Sync
    final boolean nonfairTryAcquire(int acquires) {
    	final Thread current = Thread.currentThread();
        //锁状态
    	int c = getState();
    	if (c == 0) { //state值为0,表示锁已经被释放,当前线程可以竞争锁
    		if (compareAndSetState(0, acquires)) {
    			setExclusiveOwnerThread(current);
    			return true;
    		}
    	}
    	else if (current == getExclusiveOwnerThread()) { //可重入锁判断,判断当前线程是否是已经设置的独占线程
            //可重入锁,state值加1
    		int nextc = c + acquires;
    		if (nextc < 0) // overflow
    			throw new Error("Maximum lock count exceeded");
    		setState(nextc);
    		return true;
    	}
        //锁没有被释放,也不是可冲入锁,当前线程获取锁失败,返回false
    	return false;
    }
    

    获取锁失败,!tryAcquire(arg)=true,执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法将当前线程加入到等待队列中。

    先看addWaiter(Node.EXCLUSIVE)方法,设置Node节点的类型是独占型(Node.EXCLUSIVE

    //AbstractQueuedSynchronizer.java
    private Node addWaiter(Node mode) {
        //将当前线程封装成一个mode(独占)类型的节点
    	Node node = new Node(Thread.currentThread(), mode);
    	//tail是CLH队列的尾部指针,初始为null,只有在有节点的情况下才不为null(懒加载思想)
        //以最快的方式直接将当前节点放置到队尾
    	Node pred = tail;
    	if (pred != null) {
    		node.prev = pred;
    		if (compareAndSetTail(pred, node)) {
    			pred.next = node;
    			return node;
    		}
    	}
        //直接将节点放置到队尾失败,执行入队操作
    	enq(node);
        //返回入队后的节点
    	return node;
    }
    

    继续查看enq(node)方法

    //AbstractQueuedSynchronizer.java
    private Node enq(final Node node) {
        //死循环(自旋),直到将node入队才会结束,否则一直循环
    	for (;;) {
    		Node t = tail;
    		if (t == null) { //tail节点为空,表示CLH队列还没有初始化,必须先初始化
    			if (compareAndSetHead(new Node())) //cas方式创建一个空的Node节点(傀儡节点),并将head指向新创建的空节点
    				tail = head; //tail也指向新创建的节点
    		} else {
                //CLH队列初始化完成
    			node.prev = t;
                //cas方式将node节点添加到队尾,入队完成
    			if (compareAndSetTail(t, node)) {
    				t.next = node;
    				return t;
    			}
    		}
    	}
    }
    

    Node节点入队完成后,执行acquireQueued()方法,具体源码如下

    //AbstractQueuedSynchronizer.java
    final boolean acquireQueued(final Node node, int arg) {
    	//标记线程是否获取到锁
        boolean failed = true;
    	try {
            //标记等待过程中线程是否被中断
    		boolean interrupted = false;
            //死循环,自旋
    		for (;;) {
                //获取当前节点的前置节点
    			final Node p = node.predecessor();
                //如果前置节点是head,表示当前线程所在节点位于队列中第二个节点(第一个是傀儡节点)
                //此时当前线程重新尝试获取锁
    			if (p == head && tryAcquire(arg)) {
                    //获取锁成功,表示之前占用锁的线程要么释放锁了,要不就是中断了
                    //将当前节点设置为头节点,node中的prve和next都设置为null,tread也设置为null,当前节点成为傀儡节点
    				setHead(node);
    				p.next = null; // help GC
    				failed = false; //获取到锁
    				return interrupted; //未被中断
    			}
                //非首节点
    			if (shouldParkAfterFailedAcquire(p, node) &&
    				parkAndCheckInterrupt())
                    //在线程等待过程中,如果线程被中断了,interrupted就会被标记为true,但是aqs不处理线程的中断,由线程本身去处理中断状态
                    //如果线程未被中断,正常情况下会一直等待到被唤醒,重新抢占锁
    				interrupted = true; 
    		}
    	} finally {
    		if (failed)
                //取消等待,出现异常时才进入,tryAcquire由子类实现,该方法抛出异常会导致取消节点等待
    			cancelAcquire(node);
    	}
    }
    

    先看下shouldParkAfterFailedAcquire方法

    //AbstractQueuedSynchronizer.java
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前置节点状态,有5种值:1,-1,-2,-3,0
    	int ws = pred.waitStatus;
        //前置节点状态等于-1,表示当前节点可以被前置节点唤醒,因此当前节点可以正常park()
    	if (ws == Node.SIGNAL)
    		return true;
    	if (ws > 0) {//前置节点状态等于1,表示前置节点已经被中断,需要从当前节点开始向前查找,直到找到未被中断的节点,然后设置为当前节点的前置节点
    		do {
    			node.prev = pred = pred.prev;
    		} while (pred.waitStatus > 0);
    		pred.next = node;//找到前置节点中未中断的节点,之前的节点会从队列中出队,被gc掉
    	} else {
            //前置节点状态是0或者-2,-3,此时不能判断当前节点是否可以park,cas的方式将前置节点状态设置为-1
    		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    	}
    	return false;
    }
    

    一个新的节点,至少要经历两次自旋,shouldParkAfterFailedAcquire方法才会返回true,此时就会进入parkAndCheckInterrupt方法

    //AbstractQueuedSynchronizer.java
    private final boolean parkAndCheckInterrupt() {
        //调用park使线程进入waiting状态(响应中断)
    	LockSupport.park(this);
        //线程被唤醒或者中断后,返回中断标识位并清除,并且park是不响应中断异常的,外部中断只会唤醒等待的线程
    	return Thread.interrupted();
    }
    

    由于LockSupport.park(obj)方法不响应中断异常,因此acquireQueued()方法中只有在由子类实现的tryAcquire方法抛出异常时才会执行取消节点等待的cancelAcquire方法

    //AbstractQueuedSynchronizer.java
    private void cancelAcquire(Node node) {
    	//节点不存在,忽略
    	if (node == null)
    		return;
    
    	node.thread = null;
    
    	// 跳过取消的节点
    	Node pred = node.prev;
    	while (pred.waitStatus > 0)
    		node.prev = pred = pred.prev;
    
    	//未取消的前置节点的下一个节点
    	Node predNext = pred.next;
    
    	//当前节点的状态值改为取消(1)
    	node.waitStatus = Node.CANCELLED;
    
    	// 如果当前节点是尾节点,cas直接删除
    	if (node == tail && compareAndSetTail(node, pred)) {
    		compareAndSetNext(pred, predNext, null);
    	} else {
    		//如果当前节点的后继节点需要唤醒,那么就cas的方式设置前置节点的后继节点,否则直接唤醒后继节点
    		int ws;
    		if (pred != head &&
    			((ws = pred.waitStatus) == Node.SIGNAL ||
    			 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    			pred.thread != null) {
    			Node next = node.next;
    			if (next != null && next.waitStatus <= 0)
    				compareAndSetNext(pred, predNext, next);
    		} else {
    			unparkSuccessor(node);
    		}
    
    		node.next = node; // help GC
    	}
    }
    

    最后再回到acquire()方法

    //AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {
    	if (!tryAcquire(arg) &&
    		acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    		selfInterrupt();
    }
    

    tryAcquire表示尝试去获取锁,acquireQueued表示在获取锁失败后入队等待,如果当前线程获取锁失败,并且在入队等待过程中被中断,那么就会执行selfInterrupt进行自我中断

    //AbstractQueuedSynchronizer.java
    static void selfInterrupt() {
    	Thread.currentThread().interrupt();
    }
    

    此处只是设置了线程的中断状态,中断之后需要如何操作需要在线程中对应编码

    4.2.5.1 加锁过程总结

    graph LR A(start) --> B(tryAcquire) B --> |N| C(addWaiter) C --> |park| E(waiting) E --> |unpark or interrupt|F(head&&tryAcquire) F --> |N| E F --> |Y| D B --> |Y| D(END)

    加锁过程中最重要的一个方法就是 tryAcquire,需要子类自行实现,并且要设置好state的值对应的状态,其余的入队唤醒等操作在aqs中都已经进行了实现,子类不需要再关心。

    4.2.5.2 释放过程分析

    释放锁的入口是unlock

    // ReentrantLock.java
    public void unlock() {
    	sync.release(1);
    }
    

    syncReentrantLock的内部类,继承了AQS类,release是定义在AQS中的方法

    //AbstractQueuedSynchronizer.java
    public final boolean release(int arg) {
        //尝试释放锁
    	if (tryRelease(arg)) {
    		Node h = head;
            //头节点不为空且状态不等于0(有后继节点入队且需要唤醒时,设置当前前置节点状态为-1)
    		if (h != null && h.waitStatus != 0)
                //唤醒后继节点
    			unparkSuccessor(h);
    		return true;
    	}
    	return false;
    }
    

    根据tryRelease的返回值来确定释放锁是否成功,tryRelease也是一个需要子类重载的方法,AQS中的原始方法如下

    //AbstractQueuedSynchronizer.java
    protected boolean tryRelease(int arg) {
    	throw new UnsupportedOperationException();
    }
    

    ReentrantLock中重载的tryRelease方法

    //ReentrantLock.Sync.java
    protected final boolean tryRelease(int releases) {
        //state值减1
    	int c = getState() - releases;
        //判断当前线程是否是独占线程,不是抛出异常
    	if (Thread.currentThread() != getExclusiveOwnerThread())
    		throw new IllegalMonitorStateException();
    	boolean free = false;
    	if (c == 0) {//c==0表示当前线程可以释放锁了
    		free = true;
            //将独占线程设置为null
    		setExclusiveOwnerThread(null);
    	}
        //设置state的值
    	setState(c);
    	return free;
    }
    
    //state值使用volatile修饰
    protected final void setState(int newState) {
    	state = newState;
    }
    

    释放锁之后,需要判断队列中是否有线程在等待唤醒,如果有的话,需要执行unparkSuccessor方法唤醒后继节点

    //AbstractQueuedSynchronizer.java
    private void unparkSuccessor(Node node) {
    	//node节点为傀儡节点(待释放锁的线程节点,出队后变成傀儡节点)
    	int ws = node.waitStatus;
        //傀儡节点状态小于0时,cas方式修改为0
        //失败了也无所谓,新的节点入队时会进行修正
    	if (ws < 0)
    		compareAndSetWaitStatus(node, ws, 0);
    
    	//傀儡节点的下一个节点
    	Node s = node.next;
    	if (s == null || s.waitStatus > 0) {//后继节点为空或者中断了
    		s = null;
            //从tail开始向前遍历,直到找到节点状态小于0的有效节点
    		for (Node t = tail; t != null && t != node; t = t.prev)
    			if (t.waitStatus <= 0)
    				s = t;
    	}
    	if (s != null)
            //有效的后继节点非空,唤醒
    		LockSupport.unpark(s.thread);
    }
    

    4.2.5.3 释放过程总结

    release是独占模式下释放锁的顶层入口,能够释放指定数量的资源(state每次减少的值),在资源完全释放(state=0)时表示当前线程已经完全释放了锁,如果有后续线程需要唤醒就会去唤醒对应的线程。

    4.3 应用

    工具类 锁类型 state作用
    Semephore 共享锁 初始化许可数量
    CountDownLatch 共享锁 维护计数器
    ReentrantLock 独占锁 锁重入次数
    ReentrantReadWriteLock 读锁:共享,写锁:独占 高16位表示共享锁数量,低16位表示独占锁重入次数
    CyclicBarrier 共享锁 维护计数器

    4.4 总结

    不同的自定义同步器争用共享资源的方式不同,自定义同步器在实现时只需要定义好state的获取和释放方式即可,具体的线程等待队列的维护AQS已经在顶层实现好了,自定义同步器在定义时其实只需要根据需要实现以下几个方法即可:

    • tryAcquire(int): 独占模式获取资源,成功返回true,失败返回false;
    • tryRelease(int): 独占模式释放资源,成功返回true,失败返回false;
    • tryAcquireShared(int): 共享模式获取资源;
    • tryReleaseShared(int): 共享模式释放资源;

    参考文档:

    1.java并发之AQS详解
    2.CLH lock queue的原理解释及Java实现

  • 相关阅读:
    MySQL修改root密码的多种方法
    (转)云存储:阿里云OSS 、又拍云和 七牛 的比较
    微博feed系统的推(push)模式和拉(pull)模式和时间分区拉模式架构探讨
    Feed系统架构资料收集
    百万用户时尚分享网站feed系统扩展实践
    (转)浅谈MD5加密算法中的加盐值(SALT)
    json转换成list map集合
    android在假设绘制自己定义的bitmap,然后返回给ImageView
    APPCAN学习笔记003---原生开发与HTML5技术
    【问题解决】syntax error: unexpected end of file或-bash: ./full_build.sh: /bin/bash^M: bad interpreter: No
  • 原文地址:https://www.cnblogs.com/ybyn/p/15118644.html
Copyright © 2011-2022 走看看