Semaphore主要用于对线程的控制,举个使用场景,几十个excel文件,数量千万。通过多线程几十个线程读取数据之后,要写入到数据库。但是数据库的连接数只有10。这个时候我们就要通过这个类对这些线程进行控制。因为数据写入时间过程,其他线程无法获取连接,出现无法获取数据库连接报错的情况。
- 主要的方法调用还是使用AQS中现有的方法
源码解析
内部锁结构
- 内部锁继承了AQS,提供了公平锁和非公平锁两种,功能大致相同。
//基础锁类,提供所有锁基础方法
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {//设置状态初始值
setState(permits);
}
final int getPermits() {//获取状态数
return getState();
}
//获取共享锁方法
final int nonfairTryAcquireShared(int acquires) {//非公平获取锁
for (;;) {
int available = getState();
int remaining = available - acquires;//剩余数量
if (remaining < 0 ||
compareAndSetState(available, remaining))//当剩余数量小于0,或者设置成功则返回剩余数量
return remaining;
}
}
//释放共享锁,state数量增加
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//换成新的值
return true;
}
}
// 减少权限数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current)
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))//替换减少后的权限数,到state
return;
}
}
//将权限归零
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
//非公平锁,直接调用基础锁类的获取锁的方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//公平锁,要判断前置节点
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())//判断是否有前置节点,如果有,则返回-1,放入到同步队列中
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))//更新锁状态
return remaining;
}
}
}
- Semaphore通过传参可以调整是公平锁还是非公平锁
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
aquire 方法
- 通过设置权限数,让锁状态减少对应权限数量
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
release方法
- 释放权限数,对应的状态码+1
public void release() {
sync.releaseShared(1);
}