public class SelfLock implements Lock{ //state 表示获取到锁 state=1 获取到了锁,state=0,表示这个锁当前没有线程拿到 private static class Sync extends AbstractQueuedSynchronizer{ //是否占用 protected boolean isHeldExclusively() { return getState()==1; } protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int arg) { if(getState()==0) { throw new UnsupportedOperationException(); } setExclusiveOwnerThread(null); setState(0); return true; } Condition newCondition() { return new ConditionObject(); } } private final Sync sycn = new Sync(); @Override public void lock() { sycn.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sycn.acquireInterruptibly(1); } @Override public boolean tryLock() { return sycn.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sycn.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { sycn.release(1); } @Override public Condition newCondition() { return sycn.newCondition(); } }
public class Test { private static SelfLock sl = new SelfLock(); private static int a = 0; private static Condition con = sl.newCondition(); public static void increment() { sl.lock(); a++; sl.unlock(); } private static CyclicBarrier cb = new CyclicBarrier(31);// 设置一个同步屏障,等30个线程+主线程都执行完后再打印最终a的值 public static void main(String[] args) throws InterruptedException, BrokenBarrierException { for (int i = 0; i < 30; i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 100000; j++) { increment(); } try { cb.await(); //System.out.println(Thread.currentThread().getId() + " wo hao le"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } cb.await(); System.out.println(a); } }
public class TrinityLock { //为3表示允许两个线程同时获得锁 private final Sync sync = new Sync(3); private static final class Sync extends AbstractQueuedSynchronizer { //private static final long serialVersionUID = -7889272986162341211L; Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large than zero."); } setState(count);//设置3个许可证,最多允许3个共享线程 } public int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } public boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; if (compareAndSetState(current, newCount)) { return true; } } } final ConditionObject newCondition() { return new ConditionObject(); } } public void lock() { sync.acquireShared(1); } public void unlock() { sync.releaseShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryAcquireShared(1) >= 0; } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(time)); } public Condition newCondition() { return sync.newCondition(); } }
实现思路:一个锁的基本行为范式都定义在Lock接口中,我们只要实现Lock接口即可。lock接口中主要的lock和unlock方法的实现AQS已经帮我们实现大部分了,我们只要实现对state共享资源的细化操作就可以实现不同的锁也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared方法。
那么为啥我们只要实现上述4个方法就能实现不同的锁?
比如:
public void lock() {
sycn.acquire(1);
}
它的实现依赖于AQS中acquire方法 如下面代码
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
这里的tryAcquire就是我们重写的方法,我们重写的方法假如state为0,就将state置为1,并将当前线程设为独占线程,
假如state为1,返回false,然后addWaiter将当前线程置入同步队列的尾部,acquireQueued是自旋中,假如head节点指向了当前线程Node,就tryAcquire,返回interrupted值,然后根据条件值执行selfInterrupt阻塞当前线程。
总结:AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。