要实现同步,一般我们会用synchronized,但是其实除了synchronized之外,还有一些方法,比如说ReentrantLock,ReentrantLock其实是完全可以取代synchronized的,而且比synchronized会更灵活
ReentrantLock概述
ReentrantLock使用的是一种无阻塞式的同步机制,其底层用的是AQS,同时提供了比synchronized更加灵活的,强大的加锁解锁机制。ReentrantLock还提供了公平锁也非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,否则为非公平锁。公平锁与非公平锁的区别在于公平锁的锁获取是有顺序的。但是公平锁的效率往往没有非公平锁的效率高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。关于公平锁,可以看我上一篇锁概述的文章
ReentrantLock的使用
关于ReentrantLock的使用其实也很简单,示例如下:
Lock lock = new ReentrantLock()
public void put() {
try {
lock.lock();
do something....
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
只需要在需要同步的代码的部分 调用lock方法,一直到unlock的部分,都是同步的代码块。但是有一点要注意,unlock一定要在finally里调用。否则的话,如果try里面的代码出了一异常,那么这个锁将永远锁住,别的线程也无法获取这个锁。相比synchronized,这两者在同步机制上的区别则是,synchronized整个过程完全是自动的,没有可操作性,而ReentrantLock整个加锁解锁机制都是可以自己控制的,因此相对灵活。
获取锁
我们一般都是这么使用ReentrantLock获取锁的:
//非公平锁
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock方法:
public void lock() {
sync.lock();
}
Sync为ReentrantLock里面的一个内部类,它继承AQS(AbstractQueuedSynchronizer),它有两个子类:公平锁FairSync和非公平锁NonfairSync。 ReentrantLock里面大部分的功能都是委托给Sync来实现的,同时Sync内部定义了lock()抽象方法由其子类去实现,默认实现了nonfairTryAcquire(int acquires)方法,可以看出它是非公平锁的默认实现方式。下面我们看非公平锁的lock()方法:
final void lock() {
//尝试获取锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//获取失败,调用AQS的acquire(int arg)方法
acquire(1);
}
首先会第一次尝试快速获取锁,如果获取失败,则调用acquire(int arg)方法,该方法定义在AQS中,如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(int arg)
由子类实现,获取锁。 - addWaiter(Node mode)
获取锁失败后,将等待线程封装成Node加入等待队列,由AQS实现。 - acquireQueued(final Node node, int arg)
在队列中如果其前驱节点是头节点,就循环获取锁,获取锁成功就返回。
如果其前驱不是头节点,或者是头节点但是获取锁失败,挂起当前线程。由AQS实现。 - selfInterrupt()
自我中断,当获取锁的时候,发生中断时记录下来,推迟到抢锁结束后中断线程
这个方法首先调用tryAcquire(int arg)方法,在AQS中讲述过,tryAcquire(int arg)需要自定义同步组件提供实现,非公平锁实现如下:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//state == 0,表示没有该锁处于空闲状态
if (c == 0) {
//获取锁成功,设置为当前线程所有
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//线程重入
//判断锁持有的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法主要逻辑:首先判断同步状态state == 0 ?,如果是表示该锁还没有被线程持有,直接通过CAS获取同步状态,如果成功返回true。如果state != 0,则判断当前线程是否为获取锁的线程,如果是则获取锁,成功返回true。成功获取锁的线程再次获取锁,这是增加了同步状态state。
释放锁
获取同步锁后,使用完毕则需要释放锁,ReentrantLock提供了unlock释放锁:
public void unlock() {
sync.release(1);
}
unlock内部使用Sync的release(int arg)释放锁,release(int arg)是在AQS中定义的:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
与获取同步状态的acquire(int arg)方法相似,释放同步状态的tryRelease(int arg)同样是需要自定义同步组件自己实现:
protected final boolean tryRelease(int releases) {
//减掉releases
int c = getState() - releases;
//如果释放的不是持有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state == 0 表示已经释放完全了,其他线程可以获取同步状态了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
只有当同步状态彻底释放后该方法才会返回true。当state == 0 时,则将锁持有线程设置为null,free= true,表示释放成功。
ReentrantLock与synchronized的区别
前面提到ReentrantLock提供了比synchronized更加灵活和强大的锁机制,那么它的灵活和强大之处在哪里呢?他们之间又有什么相异之处呢? 首先他们肯定具有相同的功能和内存语义。
- 与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
- ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合(以后会阐述Condition)。
- ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
- ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
- ReentrantLock支持中断处理,且性能较synchronized会好些。
Condition
关于ReentrantLock还有一个非常重要的用处就是,可以通过条件变量,指定唤醒对应的一系列线程。举一个非常经典的生产者消费者的例子
/**
* 创建两个生产者线程和10个消费者线程
* 当生产者线程让容器为10的时候
* ,通知消费者线程消费,
* 数组没元素的时候,
* 通知生产者线程生产
* @param <T>
*/
public class MyContainer2<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;
private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();
public void put(T t) {
try {
lock.lock();
while(lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}
lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public T get() {
T t = null;
try {
lock.lock();
while(lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count --;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++)
System.out.println(c.get());
}, "c" + i).start();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}
实际上Condition的原理就是,将原本的一个阻塞队列,分成了多个,例如上例中的,原本如果是synchronized的话,只会有一个阻塞队列,所有的队列都会在里面排队,达不到灵活的控制,现在引入了Condition之后,分成了生产者队列和消费者队列,这样,就可以唤醒生产者队列中的所有线程,而不会对消费者队列产生影响