介绍一下AQS:
注意看绿色的方法,这些是对外开放的方法。
另外,AQS是一个CLH变种双边队列,原始的CLH是.net人员设计的,他们设计的时候是让每个阻塞在队列上的线程不停的自旋,而在java中借鉴了这种设计方式,但是不再是自旋,而是通过LockSupport.park(this);这样一个方法将线程阻塞。
***********************
acquire方法:
其中acquire开头的方法是AQS默认实现的通用加锁方法,带有Interruptibly结尾的是处理打断状态的请求方法,不带的是不处理打断标志的方法。带有shared的是共享锁的加锁方法,不带有的则是独占锁的加锁方法。
上面是AQS中的acquire方法(AQS实现的这个是公平锁的获取方式):独占锁获取方法,先调用tryAcquire方法,如果成功则加锁成功,如果失败则线程开始排队,并阻塞自己。tryAcquire方法如下:
方法的注释解释的很清楚,子类应该去实现这个方法,因为此方法会被acquire方法调用,而独占锁获取锁必须调用acquire方法,如果你实现的是一个独占锁,那么你必须实现此方法,如果是共享锁则不必。
实现此方法时:当返回失败,则此线程应该排队(直到被另外一个线程通过release唤醒或者它被中断),如果成功则上锁成功。
******************
release方法:
release方法是AQS释放阻塞线程的方法,让其去竞争锁,带有shared是共享锁的释放方法,不带的则是独占锁的方法。思路一致。这里看下公平锁的release方
注释很清楚,AQS负责释放阻塞线程,子类实现自己的tryRelease方法去释放锁。其他的方法就不赘述了。
*************************
介绍下ReentrantLock:
ReentrantLock是实现AQS的独占锁,它内部先是定义了一个抽象同步器Sync实现了AQS,这个方法定义了非公平锁尝试获取锁的方式(尝试获取锁是不会阻塞的,只会返回成功或失败)。
然后是公平锁和非公平锁:
结合之前AQS的介绍,相信我们一眼就可以看透这两个类了,非公平锁的lock方法是自己利用CAS实现的抢占式加锁,如果失败则专用AQS的公平锁加锁方式;而公平锁就简单多了lock方法调用的就是AQS提供的加锁方式。
而尝试获取锁的方式都是自定义的,前者定义在sync中后者定义在公平锁子类中。代码不复杂,不需要赘述了。
**************************
介绍下ConditionObject:
这个是条件队列,相信很多人看到这个类的时候是很迷惑的,我也是。别慌先看类的方法:
就俩类方法await和signal,分别是阻塞和唤醒。还有两个属性Node,说明这是个双端链表结构的组织,因为有前驱和后驱两个节点。用大母脚趾头想都能想到这个类是干啥用的。就不赘述了。
---------------------------------------------
ReentrantLock测试:
ConditionObject测试:
线程2和线程3是前两个进入锁的线程因为各自调用count+1后,但是不满足Condition的条件,因此被阻塞到Condition1对象上,然后后面进来的所有线程都满足Condition队列的要求,因此可以直接执行。可以看到除了第2和第3线程外,其他线程都是顺序执行(按照非公平锁的CLH队列顺序执行)。
调用Lock的newCondition方法会创建一个绑定到此Lock上的阻塞队列Condition,这个Condition又叫做条件队列,当没有满足此Condition的条件前,锁是由获得Lock的当前线程占有,当满足Condition的条件后,可以调用Condition.await方法,这样当前线程会以原子的方式释放掉此Lock,然后阻塞在Condition的阻塞队列中,直到被同一个Condition唤醒或者此线程被打断。
--------------------------
手写独占锁:
/** * @author yangshengqiang * @date 2021/12/17 8:49 * @description */ public class myLock implements Lock { private sync sync = new sync(); private class sync extends AbstractQueuedSynchronizer{ protected boolean tryAcquire(int arg) { final Thread thread = Thread.currentThread(); int state = getState(); if(state==0){ if (compareAndSetState(0, arg)){ setExclusiveOwnerThread(thread); return true; } return false; }else if(state>0){ if(getExclusiveOwnerThread()==thread){ setState(state+1); return true; } } return false; } protected boolean tryRelease(int arg) { final Thread thread =Thread.currentThread();//当前线程 int state = getState();//独占锁状态 if(state<=0){// throw new RuntimeException("状态异常"); } if(thread==getExclusiveOwnerThread()){//重入锁 if(state==0){//设置锁状态为0 setExclusiveOwnerThread(null);//清除独占线程 return true; } if(compareAndSetState(state, state-1)){//锁状态数量-1 setExclusiveOwnerThread(null); return true; } } return false; } } @Override public void lock() { this.sync.acquire(1); } @Override public void unlock() { this.sync.release(1); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }
public class demo1 { static Logger logger = LoggerFactory.getLogger(demo1.class); static myLock myLock = new myLock(); public static void main(String[] args) { new Thread(()-> { try { speak(); } catch (InterruptedException e) { e.printStackTrace(); } },"线程1").start(); new Thread(()-> { try { speak(); } catch (InterruptedException e) { e.printStackTrace(); } },"线程2").start(); } static int demo = 0; public static void speak() throws InterruptedException { logger.info(Thread.currentThread().getName()+"-准备进入"); myLock.lock(); logger.info(Thread.currentThread().getName()+"-已进入"+(demo=++demo)); Thread.sleep(1000); logger.info(Thread.currentThread().getName()+"-已解锁"); myLock.unlock(); } }
可以看出我们写tryRelease也是为了设置state状态值以及释放独占线程。
再看acquire方法,我们定义的tryAcquire就是为了设置state和独占线程,至于阻塞得不到锁的线程,由AQS的acquireQueued方法实现。
所以AQS帮我们实现阻塞和唤醒的机制,而上锁的前提state==o和独占线程是本线程这些需要自己去写,因此我们使用AQS定义自己的同步器锁的时候,需要做的就是设置这两个方法,在上锁前处理好state,在下锁后同样处理好state字段值。
!!!!
如有错误,请指正。