接下来从实现角度来分析同步器是如何完成线程同步的。主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等。
1、同步队列
同步器依赖内部的一个同步队列来完成同步状态的管理。当线程获取同步状态失败时,会被加入到队列中,并同时阻塞线程。当同步状态释放时,会把首节点中的线程唤醒,使其在册尝试获取同步状态。(疑问:1、确定只唤醒首节点么,这不就是公平方式获取了么?2、首节点是否能一定获取到锁,为什么?)
a、确定只唤醒了首节点,的确这个获取方式相对比较公平,虽然新节点也可能优先获取到锁。待稍后对比重入锁获取过程;
[对比过后]:重入锁获取逻辑,就是先判断是否有人在用锁,如果有,判断是不是当前线程,如果不是,则当前线程排队,一旦进入排队队列,其实对这个队列来说,真的就是公平锁了,不公平的地方在于后来新来的线程可能会由于首节点刚好释放而获取到锁,造成“插队”;
b、首节点不一定能获取到,因为可能有新线程正好进入,然后获取了锁;
同步队列中的数据类型为Node,其中各个属性描述如下:
int waitStatus
|
等待状态:
1 ,在队列中等待的线程等待超时或者被中断,从队列中取消等待;
-1,后继节点处于等待;
-2,节点在等待队列中,当condition被signal()后,会从等待队列转到同步队列;
-3,表示下一次共享式同步状态获取将会被无条件传播下去;
0,初始状态
|
Node prev
|
前驱节点
|
Node next
|
后继节点
|
Node nextWaiter
|
等待队列中的后继节点,如果当前节点是共享的,则这个字段将是一个SHARED常量,也就是说节点类型(独占或共享)和等待队列中的后继节点共用同一字段
|
Thread thread
|
获取同步状态的线程
|
看到这里有些懵b,疑问如下:
1、状态1是被中断的,那compareAndSetState(0,1)不是设置为1就获取到锁了么,在这里貌似还是等待的?
2、状态-1,后继节点处于等待,当前节点在干啥,不是等待么?
3、状态-3,说了个啥意思?
就是觉得,这几个状态究竟怎么变化的,没理清楚。
4、节点next跟nextWaiter有啥区别,完全没搞懂。
这里,自己测试了一下,结论如下:
a、状态1是被中断的,注意这里是说的waitStatus;compareAndSetState(0,1),这里说的是state,这在AQS里边是两个变量,不是一个。
waitStatus用于记录节点的状态,state用于描述AQS的状态(标记是否处于同步中,以及记录重入次数)。
b、状态-1,后继节点等待,当前节点不一定。
如果当前节点为head节点,可能在等待(正好被新来的节点抢走了),也可能在执行;
如果当前节点不是head节点,肯定在等待;
也就是head节点执行时的状态一样也是-1,我们看一个debug的截图:
可以看到除了尾节点状态是0,其它的都是-1,包括正在执行的head节点。
c、状态-3,说了个啥,,,还没搞懂
d、没搞懂,,,,(后续来补充)
同步器结构如下:
看到这个图,我想都知道是啥意思,只是有一点注意的,就是往队尾添加元素的时候,因为可能多个线程都在获取失败后把自己往后边加,所以这个操作应该保证线程安全。因此同步器提供了一个 compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
2、独占式同步状态的获取与释放
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列移出。该方法代码:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(int arg) 是获取同步状态,addWaiter(Node node)是构建node节点并添加到队列尾部,acquireQuired(Node node,int arg)使得该节点以“死循环”的方式获取同步状态,selfInterrupt()是唤醒当前线程。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或者阻塞线程被中断来实现。
这里,实际不会是死循环,因为其中的shouldParkAfterFailedAcquire()方法会把线程挂起而阻塞住。
死循环获取同步状态的这个操作有点超出意外,看一下代码:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //获取当前节点的前节点 if (p == head && tryAcquire(arg)) { // 前节点为head,并且自己获取同步成功,说明前节点线程已经执行结束 setHead(node); //既然前节点的线程结束了,那就把自己设为head节点 p.next = null; // help GC //断开前节点,便于回收 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //这里会将线程挂起 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire(p, node)会挂起线程,该方法主要作用是根据前节点判断当前线程是否应该被阻塞,如果前节点处于CANCELLED状态,则顺便删除这些节点。阻塞的方法在parkAndCheckInterrupt中的LockSupport.park(this),这里最终调用了UNSAFE.park(false,0l)这个本地方法。关于LockSupport跟UNSAFE,可以参考:https://blog.csdn.net/secsf/article/details/78560013
同步状态的释放是通过同步器的release(int arg)进行的,该方法在释放了同步状态后会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。代码如下:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒后继节点的操作在这里 return true; } return false; }
3、共享式同步状态获取与释放
共享式获取与独占式的主要区别就是是否允许同时多个线程获取同步状态,当然,这些线程必须也是共享式获取。比如有一个线程在读,那么再来几个线程读也是没问题的,但如果来个线程要写(独占式)那就是不可以的。其主要代码如下:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //小于0,说明没获取成功 doAcquireShared(arg); //继续获取 }
继续获取的逻辑:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //节点类型为共享类型 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //尾节点的前一个节点(当前节点就是尾节点) if (p == head) { //前节点是head,则再次获取锁 int r = tryAcquireShared(arg); // 这个tryAcquireShared()的返回值是共享资源的剩余量,就是还可以允许访问的线程数 if (r >= 0) { //如果获取到了锁,进行相关设置 setHeadAndPropagate(node, r); // 进行head节点替换,并且如果剩余量有剩余,则继续往下传递 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //没获取到锁,则将线程挂起 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
感觉跟独占式并没有多大差别,这里无非要注意释放锁必须是同步的,因为可能同时有多个线程进行释放操作。
4、独占式超时获取同步状态
doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定时间段内获取同步状态,成功则返回true,否则返回false。该方法可以响应中断。
上代码:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); //节点类型是独占式的 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //前节点为head,则尝试获取锁 setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); // 计算剩余时间 if (nanosTimeout <= 0L) // 剩余时间已经到了 return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //没获取到锁,时间也没到,则挂起一小段时间。注意如果时间剩余非常小了,比spinForTimeoutThreshold还小,则不挂起了,直接死循环一小会儿,进行获取锁 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
感觉逻辑比较清楚,没啥好说。
做一个共享锁的例子试一下:
public class TwinsLock implements Lock { private final Sync sync ; private static final class Sync extends AbstractQueuedSynchronizer{ Sync(int count){ if(count <= 0){ throw new IllegalArgumentException("count must large than zero ."); } setState(count); // 这里可以看到,count就是可重入的线程数 } public int tryAcquireShared(int reduceCount){ for(;;){ int current = getState(); int newCount = current - reduceCount; if(newCount < 0 || compareAndSetState(current, newCount)){//能把数量减掉并设置,就相当于获取锁成功 return newCount; } } } public boolean tryReleaseShared(int returnCount){ for(;;){ int current = getState(); int newCount = current + returnCount; if(compareAndSetState(current, newCount)){ return true; } } } } public TwinsLock (int count){ this.sync = new Sync(count); } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } // 其它方法 }
测试:如果我们count设为1,则每次只允许一个线程进入,看上去应该跟排它锁类似,测试代码:
public class SharedTest { private int count = 0; private final TwinsLock twinsLock = new TwinsLock(1); @Test public void test(){ MyThread mt1 = new MyThread(); MyThread mt2 = new MyThread(); MyThread mt3 = new MyThread(); MyThread mt4 = new MyThread(); MyThread mt5 = new MyThread(); mt1.start(); mt2.start(); mt3.start(); mt4.start(); mt5.start(); try { mt1.join(); mt2.join(); mt3.join(); mt4.join(); mt5.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("最终结果:" + count); } class MyThread extends Thread{ @Override public void run() { for(int i=0;i<1000;i++){ twinsLock.lock(); try { count = count + 1; }catch (Exception e){ System.out.println("异常啦 ~ ~ " +e.getMessage()); }finally { twinsLock.unlock(); } } } } }
多次运行结果:
测试代码第3行改为new TwinsLock(2)后,执行结果就经常是一个小于5000的数,这是由于两个线程相互覆盖的原因。当然,这并不能证明每次就2个线程进入了,更好的测试代码应该参考《java并发编程艺术》中的相关代码;只是我读的时候临时想到了这个,就用这个测试了。
比较好奇的是,共享跟独占是怎么具体实现的,除了一个节点类型,具体判断逻辑是咋写的,两个使用的是同一个同步队列么?这个稍后结合读写锁来理一下。