Java锁原理学习
为了学习Java锁的原理,参照ReentrantLock实现了自己的可重入锁,代码如下:
先上AQS的相关方法:
// AQS = AbstractQueuedSynchronizer, 抽象队列同步器
// 它提供了对资源的占用、释放,线程的等待、唤醒等接口和具体实现
// 它维护了一个volatile int state来代表共享资源的状态,和一个FIFO线程等待队列
// 获取排它锁
// 先尝试获取锁,如果获取不到则添加到等待队列
// 等待队列通过 LockSupport.park(this) 实现等待
// 通过 LockSupport.unpark(s.thread) 实现唤醒
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 默认的tryAcquire抛出异常,需要子类实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 释放锁
// 尝试释放,如果成功则唤醒等待的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 默认的tryRelease抛出异常,需要子类实现
// 返回值true时表示当前线程已完全释放该锁(因为可重入所以需要多次release),否则返回false
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 等待条件锁
public final void await() throws InterruptedException {
// 1. 保存当前线程的锁状态(即state的值)
// 2. 调用 LockSupport.park(this); 实现等待
// 3. 被唤醒后重新尝试获取锁
}
// 条件等待唤醒,仅唤醒等待队列的第一个线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 条件唤醒,唤醒等待队列的所有线程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// signal()和signalAll()最终都调用该方法唤醒特定线程
final boolean transferForSignal(Node node) {
// LockSupport.unpark(node.thread);
}
以下是自己实现的MyLock:
public static class MyLock extends AbstractQueuedSynchronizer {
/**
*
*/
private static final long serialVersionUID = 1L;
// 加锁方法通过aqs的acquire方法实现,样例仅实现排它锁功能
public void lock() {
acquire(1);
}
// 尝试获取锁,子类需实现该方法,核心是设置aqs中state的值
@Override
protected boolean tryAcquire(int n) {
int c = getState();
Thread t = Thread.currentThread();
// 如果c为0则表示当前锁没有被占用
if (c == 0) {
// 如果没有前序线程,则通过CAS尝试设置state值为申请的资源数
if (!hasQueuedPredecessors() && compareAndSetState(0, n)) {
// 如果获取锁成功则需要记录当前持有锁的线程,用于后续可重入和释放锁
setExclusiveOwnerThread(t);
return true;
}
// 如果当前线程已持有该锁则直接更新state值为总的资源数
} else if (t == getExclusiveOwnerThread()) {
setState(c + n);
return true;
}
return false;
}
// 释放锁通过aqs的release方法实现
public void unlock() {
release(1);
}
// 尝试释放锁,子类需实现该方法
// 首先判断当前线程是否持有该锁,否则抛出异常
// 更新state的值,如果已完全释放则设置当前持有排它锁的线程为null并返回true,否则返回false
@Override
protected boolean tryRelease(int n) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
int c = getState() - n;
setState(c);
if (c == 0) {
setExclusiveOwnerThread(null);
}
return c == 0;
}
// 创建条件锁
// 条件等待:cond.await();
// 条件满足通知:cond.signal(); 或cond.signalAll();
public Condition newCondition() {
return new ConditionObject();
}
@Override
protected boolean isHeldExclusively() {
return this.getExclusiveOwnerThread() == Thread.currentThread();
}
}
测试程序:
public static void main(String[] args) {
MyLock lock = new MyLock();
// 使用MyLock或ReentrantLock结果相同
// ReentrantLock lock = new ReentrantLock();
Condition c = lock.newCondition();
Runnable run = () -> {
String n = Thread.currentThread().getName();
lock.lock();
lock.lock();
System.err.println("i am " + n);
try {
if (n.equals("t1")) {
c.await();
} else {
c.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
lock.unlock();
System.err.println(n + " finished");
};
new Thread(run, "t1").start();
new Thread(run, "t2").start();
}
// 输出
i am t1
i am t2
t2 finished
t1 finished
以下补充Semaphore的原理:
Semaphore是一个计数信号量,必须由获得它的线程释放,常用于限制可以访问资源的线程数量。
// 用法
// 初始化指定数量的许可证
Semaphore s = new Semaphore(2);
// 需要获取的许可证数量
s.acquire(2);
// 需要释放的许可证数量
s.release(2);
// 实现
public class Semaphore {
private final Sync sync;
// 通过AQS实现
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
// 获取直接调用AQS的acquire方法
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// 释放同样直接调用AQS的release方法
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
}