AbstractQueueSynchronized
提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。
因为获取锁是有条件的,没有获取锁的线程就要阻塞等待,那么就要存储这些等待的线程
而是储存拥有线程的node节点, node 即为队列的存储结构 ,双向链表
1 private Node addWaiter(Node mode) { 2 Node node = new Node(Thread.currentThread(), mode); 3 // Try the fast path of enq; backup to full enq on failure 4 Node pred = tail; 5 if (pred != null) { 6 node.prev = pred; 7 if (compareAndSetTail(pred, node)) { 8 pred.next = node; 9 return node; 10 } 11 } 12 enq(node); 13 return node; 14 }
如何让等待的线程阻塞掉?
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 final Node p = node.predecessor(); 7 if (p == head && tryAcquire(arg)) { 8 setHead(node); 9 p.next = null; // help GC 10 failed = false; 11 return interrupted; 12 } 13 if (shouldParkAfterFailedAcquire(p, node) && 14 parkAndCheckInterrupt()) 15 interrupted = true; 16 } 17 } finally { 18 if (failed) 19 cancelAcquire(node); 20 } 21 }
传进来的node 即为新创建的node,node.predecessor(),获取node的前一个节点,如果前一个节点为head ,并且能够try acquired(),
即将当前节点设置为头节点,并让头节点的下一个节点为空。如果条件不成立,即其前一个节点不为head,
(shouldParkAfterFailedAcquire(p, node)把前一个节点和当前节点传入该方法。
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus; 3 if (ws == Node.SIGNAL) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true; 9 if (ws > 0) { 10 /* 11 * Predecessor was cancelled. Skip over predecessors and 12 * indicate retry. 13 */ 14 do { 15 node.prev = pred = pred.prev; 16 } while (pred.waitStatus > 0); 17 pred.next = node; 18 } else { 19 /* 20 * waitStatus must be 0 or PROPAGATE. Indicate that we 21 * need a signal, but don't park yet. Caller will need to 22 * retry to make sure it cannot acquire before parking. 23 */ 24 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 25 } 26 return false; 27 }
pred.waitSattus(),拿到前一个节点的等待状态,如果状态为signal,唤醒状态,return true,
如果值大于0,有多个lock方法
node.prev = pred = pred.prev; pred.next = node; 移除属于cancel状态的节点。因为这些节点不需要竞争CPU资源了
return false;
如果小于0,把状态设置为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
整体循环
3 if (ws == Node.SIGNAL) 4 /* 5 * This node has already set status asking a release 6 * to signal it, so it can safely park. 7 */ 8 return true;
直到返回true
然后执行 parkAndCheckInterrupt()方法
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this); 3 return Thread.interrupted(); 4 }
park()其实就是让当前线程等待,setBlocker()
1 public static void park(Object blocker) { 2 Thread t = Thread.currentThread(); 3 setBlocker(t, blocker); 4 UNSAFE.park(false, 0L); 5 setBlocker(t, null); 6 }
解锁?
1 public void unlock() { 2 sync.release(1); 3 }
调用同步器的release()方法
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head; 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h); 6 return true; 7 } 8 return false; 9 }
ReentranrLock() tryrelease 重写
1 protected final boolean tryRelease(int releases) { 2 int c = getState() - releases; 3 if (Thread.currentThread() != getExclusiveOwnerThread()) 4 throw new IllegalMonitorStateException(); 5 boolean free = false; 6 if (c == 0) { 7 free = true; 8 setExclusiveOwnerThread(null); 9 } 10 setState(c); 11 return free; 12 }
getState() - release 当前状态 -1
如果进来的线程与当前线程不一致的话,抛出异常
当所有的lock调用了unlock,free = true ; return true;
4 if (h != null && h.waitStatus != 0)
5 unparkSuccessor(h);
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 int ws = node.waitStatus; 8 if (ws < 0) 9 compareAndSetWaitStatus(node, ws, 0); 10 11 /* 12 * Thread to unpark is held in successor, which is normally 13 * just the next node. But if cancelled or apparently null, 14 * traverse backwards from tail to find the actual 15 * non-cancelled successor. 16 */ 17 Node s = node.next; 18 if (s == null || s.waitStatus > 0) { 19 s = null; 20 for (Node t = tail; t != null && t != node; t = t.prev) 21 if (t.waitStatus <= 0) 22 s = t; 23 } 24 if (s != null) 25 LockSupport.unpark(s.thread); 26 }
node.next()指向当前节点的下一个节点
s == null, 即当前队列为空队列
24 if (s != null)
25 LockSupport.unpark(s.thread);
1 public static void unpark(Thread thread) { 2 if (thread != null) 3 UNSAFE.unpark(thread); 4 }
等待的线程叫醒争夺cpu资源
使用aqs重写锁
如果第一个线程进来拿到锁,返回true,第二个线程进来,拿不到锁返回 false
1 public class MyLock implements Lock { 2 private Helper helper = new Helper(); 3 4 @Override 5 public void lock() { 6 helper.acquire(1); 7 } 8 9 @Override 10 public void lockInterruptibly() throws InterruptedException { 11 helper.acquireInterruptibly(1); 12 } 13 14 @Override 15 public boolean tryLock() { 16 return helper.tryAcquire(1); 17 } 18 19 @Override 20 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 21 return helper.tryAcquireNanos(1,unit.toNanos(time)); 22 } 23 24 @Override 25 public void unlock() { 26 helper.tryRelease(1); 27 } 28 29 @Override 30 public Condition newCondition() { 31 return helper.newCondition(); 32 } 33 34 //将子类定义为非公共内部帮助器类,可用它们实现其封闭类的同步属性 35 private class Helper extends AbstractQueuedSynchronizer{ 36 37 @Override 38 protected boolean tryAcquire(int arg) { 39 int state = getState(); 40 if (state == 0){ 41 if (compareAndSetState(0,arg)){ 42 setExclusiveOwnerThread(Thread.currentThread()); 43 return true; 44 } 45 } 46 return false; 47 } 48 49 50 @Override 51 protected boolean tryRelease(int arg) { 52 //所得获取和释放肯定是一一对应的,调用此方法的线程一定是当前线程 53 if (Thread.currentThread() != getExclusiveOwnerThread()){ 54 throw new RuntimeException(); 55 } 56 57 int state = getState() -arg; 58 59 boolean flag = false; 60 61 if (getState() == 0){//释放成功 62 setExclusiveOwnerThread(null); 63 flag = true; 64 } 65 66 setState(state); 67 return flag; 68 69 } 70 71 Condition newCondition(){ 72 return new ConditionObject(); 73 } 74 } 75 }
main.class
1 package com.roocon.thread.t1; 2 3 public class Main { 4 5 private int value; 6 7 // public int next(){ 8 // try { 9 // Thread.sleep(300); 10 // } catch (InterruptedException e) { 11 // e.printStackTrace(); 12 // } 13 // return value ++; 14 // } 15 16 private MyLock lock = new MyLock(); 17 18 public int next(){ 19 lock.lock(); 20 21 try { 22 Thread.sleep(300); 23 return value ++; 24 } catch (InterruptedException e) { 25 throw new RuntimeException(); 26 }finally { 27 lock.unlock(); 28 } 29 30 } 31 32 public static void main(String[] args) { 33 Main m = new Main(); 34 new Thread(new Runnable() { 35 @Override 36 public void run() { 37 while (true){ 38 System.out.println(Thread.currentThread().getId()+" "+m.next()); 39 } 40 } 41 }).start(); 42 43 44 new Thread(new Runnable() { 45 @Override 46 public void run() { 47 while (true){ 48 System.out.println(Thread.currentThread().getId()+" "+m.next()); 49 } 50 } 51 }).start(); 52 53 new Thread(new Runnable() { 54 @Override 55 public void run() { 56 while (true){ 57 System.out.println(Thread.currentThread().getId()+" "+m.next()); 58 } 59 } 60 }).start(); 61 62 63 } 64 65 66 67 68 69 }
测试锁的重入
可重入锁就是一个线程在获取了锁之后,再次去获取了同一个锁,这时候仅仅是把状态值进行累加。
1 public class Main2 { 2 3 private MyLock2 lock = new MyLock2(); 4 5 public void a(){ 6 lock.lock(); 7 System.out.println("a"); 8 b(); 9 lock.unlock(); 10 } 11 12 public void b(){ 13 lock.lock(); 14 System.out.println("b"); 15 lock.unlock(); 16 17 } 18 19 public static void main(String[] args) { 20 Main2 m = new Main2(); 21 new Thread(new Runnable() { 22 @Override 23 public void run() { 24 m.a(); 25 26 } 27 }).start(); 28 29 } 30 31 }
只打印a
修改 tryAcquire()方法
1 @Override 2 protected boolean tryAcquire(int arg) { 3 //第一个线程进来,可以拿到锁,返回true 4 //第二个线程进来,拿不到锁,返回false, 5 //如果当前进来的线程和当前保存的线程是同一个线程,则可以拿到锁,但要更新状态值 6 7 Thread t = Thread.currentThread(); 8 9 int state = getState(); 10 if (state == 0){ 11 if (compareAndSetState(0,arg)){ 12 setExclusiveOwnerThread(t); 13 return true; 14 } 15 }else if (getExclusiveOwnerThread() == t){ 16 setState(state + 1);; 17 return true; 18 } 19 return false; 20 }
执行
a
b
完