/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
为了表述清楚假设当前请求获取锁的线程名称为“线程x”
(1)非公平锁的第一步:当“线程x”请求获取到锁的时候,直接调用compareAndSetState(0, 1),这个方法会将判断state=0?如果等于0,说明这个锁没有被占领,那么“线程x”直接获取,这就是非公平锁,竞争锁成功之后,就将属性Thread exclusiveOwnerThread设置为“线程x”。
(2)如果没有获取到,那么就调用 acquire(1);
一: acquire(1);
这个方法首先调用tryAcquire方法获取锁,如果获取不到就要把“线程x”加入到队列中,并且阻塞“线程x”。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
各个方法定义如下:
1、tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。该方法由NonfairSync自己实现。
2、addWaiter:如果tryAcquire返回FALSE(获取锁失败),则调用该方法将“线程x”加入到CLH同步队列尾部。
3、acquireQueued:“线程x”加入到队列之后就会调用这个方法将“线程x”挂起,直到获取到(许可)permit之后,如果线程被打断了,则将“线程x”从队列中移除。
4、selfInterrupt:如果是产生了中断,则返回true,否则返回false。
二、公平锁和非公平锁都对tryAcquire(arg)方法有各自的实现。查看非公平锁的实现。
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {(1) if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {(2) int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
这个方法有三个逻辑判断:
(1)如果没有线程占领锁,即c==0,那么竞争锁。
(2)如果锁被占领,判断锁是否被“线程x”占领,是就计算重入一次,并且返回true
(3)当锁被非“线程x”占领,就返回false.
三、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
“线程x”竞争锁失败之后来到这个方法,这个方法首先执行的是addWaiter,将“线程x”加入到队列的尾端。
3.1、addWaiter(Node.EXCLUSIVE)
/** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null;
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
这个方法包含两个部分,enq(node)以上是一个部分,enq(node)是一个部分。第一部分是一个快速入队列的方法,当只有一个线程来请求锁的时候或者请求的方式是异步方式的时候,可以实现线程节点一个一个进入队列。enq(node)是给多个线程竞争入队列的。这部分在我的下载文件中,有详细的画图说明,看不明白可以下载看看。
3.2、enq(node)
这个方法是将节点入队,是一个死循环,只有把节点添加到队列中之后才会返回,返回的是添加节点的前置节点。在这个方法中包括了一个节点初始化的部分,就是创建头节点和尾节点。
Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else {
3.3、acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();(1) if (p == head && tryAcquire(arg)) {(2) setHead(node);(3) p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) &&(4) parkAndCheckInterrupt())(5) interrupted = true; } } finally { if (failed) cancelAcquire(node);(6) } }
根据标号分析每个部分的作用:
(1)“线程x”添加到队列之后,返回的是“线程x”节点。(1)的作用是取到“线程x”节点的前一个节点,这里假设是“线程y”
(2)如果“线程y”是头节点,并且“线程x”获取锁成功,则执行(3),把这个“线程x”节点设置成头节点,并且更改当前线程节点的thread=null,prev=null,这里其实就是把当前节点更改成了头节点的状态了。
(4)如果“线程y”节点不是头节点(就是说前面已经有其他线程节点来请求锁,并且加入到队列中,又阻塞了)或者“线程x”没有获取锁成功,那么执行(4)shouldParkAfterFailedAcquire的源码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;// 获取前置节点“线程y”的waitStatus if (ws == Node.SIGNAL) // 如果ws ==-1 /* * Node.SIGNAL的作用就是,当前的线程节点释放锁之后,可以顺利的通知它的继任线程节点(这个会在释放锁的时候说明),让他取获取锁 *这里问题就是,如果继任节点还在请求锁的话可以正常获取到,一旦继任节点被中断或者抛出异常了,怎么把这个节点移除队列? * */ return true; if (ws > 0) { /* *如果ws>0,目前来看只有CANCELLED。那么就会就要往“线程y”节点的前面去找一个节点, *直到这个节点的pred.waitStatus <=0, *然后 pred.next = node;把当前的节点赋值给找到的这个节点的next *如果执行了这个方法,那么“线程x”的前置节点就会改变,不会再是“线程y”节点 *这里假设“线程y”节点ws = -1 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. *ws必须等于0或者-3.表示我们需要一个信号,但是不是停止。调用者必须再次确认当前的节点在停止前必须时没有获取到锁的。 *这里就是又要跳出当前的方法去执行死循环了,下次还会再进来,知道返回true,那个时候才会去执行parkAndCheckInterrupt */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
(5) parkAndCheckInterrupt(),当确定“线程x”节点需要挂起的时候,那么就会执行这个方法去挂起“线程x”节点。
(6)cancelAcquire(node);这个方法的作用就是当“线程x”节点出现异常获取被终端之后把它从队列中移除。
总结:
第一次看源码,建议先使用一下重入锁(ReentrantLock),然后带着下面几个问题看源码,并且从源码中找答案:
1、使用一个自己可以调试源码的IDE工具,我使用的intellij,写测试类调试源码很方便?
2、ReentrantLock是重入锁,那么实现的原理是什么?
3、重入锁是什么意思,应用场景是什么,怎么实现的?
4、公平锁与非公平锁是什么意思?为什么公平与不公平?
5、线程是怎么竞争获取到锁的?CAS指的是什么?
6、锁的排他模式与共享模式?
7、线程怎么被封装成node,并且通过什么方式加入到队列中?(这个部分最好画图,而且要考虑到各个线程之间的竞争入队)?
8、线程使用完锁之后,怎么传递给下一个线程使用?