第一.Lock锁
1.1Lock简介
1.2Lock实现
1.3Lock 的类关系图
1.3ReentrantLock重入锁
也就是说,线程可以进入任何一个它拥有的锁同步着的代码块。
如果当前线程 t1 通过调用 lock 方法获取了锁之后,再次调用 lock,是不会再阻塞去获取锁的,直接增加重试次数就行了。synchronized 和 ReentrantLock 都是可重入锁。synchronized 代码案例
public class ReentrantDemo {
public synchronized void sendSms(){
System.out.println(Thread.currentThread().getId()+" sendSms"+"bean start");
sendEmail();
}
public synchronized void sendEmail(){
System.out.println(Thread.currentThread().getId()+" sendEmail"+"end start");
}
public static void main(String[] args) {
ReentrantDemo reentrantDemo = new ReentrantDemo();
new Thread(()->{
reentrantDemo.sendSms();
},"t1").start();
new Thread(()->{
reentrantDemo.sendSms();
},"t2").start();
}
}
public class AtomicDemo implements Runnable{
private Lock lock = new ReentrantLock();
@Override
public void run() {
get();
}
public void get(){
try {
lock.lock();
System.out.println(Thread.currentThread().getId()+" get()");
set();
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
public void set(){
try {
lock.lock();
System.out.println(Thread.currentThread().getId()+" set()");
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
AtomicDemo atomicDemo = new AtomicDemo();
Thread t1 = new Thread(atomicDemo,"t1");
Thread t2 = new Thread(atomicDemo,"t2");
t1.start();
t2.start();
}
}
1.4ReentrantReadWriteLock
独占锁(写锁)/共享锁(读锁)/互斥锁
独占锁:指该锁一次只能被一个线程所持有。对ReentrantLock和Synchronized而言都是独占锁
共享锁:指该锁可被多个线程所持有。
对ReentrantReadWriterLock其读锁是共享锁,其写锁是独占锁。
读锁的共享锁可保证并发读是非常高效的,读写,写读,写写的过程是互斥的。
多个线程同时读一个资源类没有任何问题,所以为了满足并发量,读取共享锁资源应该可以同时进行。但是如果有一个线程想去写共享资源来,
就不应该再有其他线程可以对该资源进行读或者写
小总结:
读--读能共存
读--写不能共存
写--写不能共存
多个线程同时读一个资源类没有任何问题,所以为了满足并发量, 读取共享锁资源应该可以同时进行。但是如果有一个线程想去写共享资源来,
就不应该再有其他线程可以对该资源进行读或者写 即保证数据的一致性,也提升了数据的并发性
/**
* 写操作:原子+独占,整个过程必须是一个完整的统一体,中间不许被分割,被打断
*/
class MyCache{
private volatile Map<String,Object> map = new HashMap<String,Object>();
ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
public void put(String key,Object value){
//写锁
rw.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+" 正在写入: "+key);
map.put(key,value);
Thread.sleep(300);
System.out.println(Thread.currentThread().getName()+" 写入完成");
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
rw.writeLock().unlock();
}
}
public void get(String key){
//读锁
rw.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在读取: "+key);
Object val = map.get(key);
Thread.sleep(300);
System.out.println(Thread.currentThread().getName()+"读取完成: ");
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
rw.readLock().unlock();
}
}
}
public class ReentrantWriterDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 10; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp);
},String.valueOf(i)).start();
}
for (int i = 0; i < 10; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
在这个案例中,通过 hashmap 来模拟了一个内存缓存,然后使用读写所来保证这个内存缓存的线程安全性。当执行读操作的时候,需要获取读锁,在并发访问的时
1.5AQS
1.5.1AQS的定义
在 Lock 中,用到了一个同步队列 AQS,全称 AbstractQueuedSynchronizer,它是一个同步工具也是 Lock 用来实现线程同步的核心组件。
1.5.2AQS的两种功能
1.5.3AQS的内部实现
static final class Node {
/**共享模式下等待标识*/
static final Node SHARED = new Node();
/** 指示节点正在以独占模式等待的标记*/
static final Node EXCLUSIVE = null;
/** 等待线程以取消 */
static final int CANCELLED = 1;
/** 后续线程需要释放 */
static final int SIGNAL = -1;
/** 线程正在等待条件*/
static final int CONDITION = -2;
/**
*waitStatus值,指示下一个acquireShared应该无条件传播
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
/**
* 前驱节点
*/
volatile Node prev;
/**
后继节点
*/
volatile Node next;
/**
*当前线程
*/
volatile Thread thread;
/**
*存储在condition队列中的后继节点
*/
Node nextWaiter;
/**
* 是否为共享锁
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
//将线程构造成一个Node,添加到等待队列
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
1.5.3释放锁以及添加线程对于队列的变化
AQS中
/**
* 等待队列的头,延迟初始化。除初始化外,只能通过setHead方法进行修改。注意:如果head存在,则保证其waitStatus不被取消。
*/
private transient volatile Node head;
/**
* 等待队列的尾部,延迟初始化。仅通过方法eng进行修改以添加新的等待节点。
*/
private transient volatile Node tail;
/**
* 状态
*/
private volatile int state;
里会涉及到两个变化
这个过程也是涉及到两个变化
1.6ReentrantLock 的源码分析
以 ReentrantLock 作为切入点,来看看在这个场景中是如何使用 AQS 来实现线程的同步的
1.6.1ReentrantLock的时序图
ReentrantLock.lock()
这个是ReentrantLock的入口
public void lock() {
sync.lock();
}
sync 实际上是一个抽象的静态内部类,它继承了 AQS 来实现重入锁的逻辑,AQS 是一个同步队列,它能够实现线程的阻塞以及唤醒,但它并不具备业务功能,所以在不同的同步场景中
,会继承 AQS 来实现对应场景的功能Sync 有两个具体的实现类,分别是:
NofairSync:表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
CAS实现原理
protected final boolean compareAndSetState(int
expect, int update) {
// See below for intrinsics setup to support
this
return unsafe.compareAndSwapInt(this,
stateOffset, expect, update);
}
通过 cas 乐观锁的方式来做比较并替换,这段代码的意思是,如果当前内存中的state 的值和预期值 expect 相等,则替换为 update。更新成功返回 true,否则返
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state状态
int c = getState();
//表示无锁状态
if (c == 0) {
//cas 替换 state 的值,cas 成功表示获取锁成功
if (compareAndSetState(0, acquires)) {
//保存当前获得锁的线程,下次再来的时候不要再尝试竞争锁
setExclusiveOwnerThread(current);
return true;
}
}
//如果同一个线程来获得锁,直接增加重入次数
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
private Node addWaiter(Node mode) { //把当前线程封装成Node节点 Node node = new Node(Thread.currentThread(), mode); //tail 是 AQS 中表示同比队列队尾的属性,默认是 null Node pred = tail; //tail 不为空的情况下,说明队列中存在节点 if (pred != null) { //把当前线程的 Node 的 prev 指向 tail node.prev = pred; //通过 cas 把 node加入到 AQS 队列,也就是设置为 tail if (compareAndSetTail(pred, node)) { //设置成功以后,把原 tail 节点的 next指向当前 node pred.next = node; return node; } } ;//tail=null,把 node 添加到同步队列 enq(node); return node; }
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
假设 3 个线程来争抢锁,那么截止到 enq 方法运行结束之后,或者调用 addwaiter方法结束后,AQS 中的链表结构图
AQS.acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前节点的 prev 节点
final Node p = node.predecessor();
//如果是 head 节点,说明有资格去争抢锁
if (p == head && tryAcquire(arg)) {
;//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获
得执行权限
setHead(node);
//把原 head 节点从链表中移除
p.next = null;
failed = false;
return interrupted;
}
//ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//并且返回当前线程在等待过程中有没有中断过。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1.7Condition
public class ConditionWait implements Runnable {
private Lock lock;
private Condition condition;
public ConditionWait(Lock lock,Condition condition){
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin -ConditionWait");
try {
lock.lock();
condition.await();
System.out.println("end -ConditionWait");
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionDemoSignal implements Runnable {
private Condition condition;
private Lock lock;
ConditionDemoSignal(Lock lock,Condition condition){
this.lock = lock;
this.condition = condition;
}
@Override
public void run() {
System.out.println("begin ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end ConditionDemoSignal");
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
}
/**
* 测试类
*/
public class Test001 {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionWait conditionWait = new ConditionWait(lock,condition);
ConditionDemoSignal signal = new ConditionDemoSignal(lock,condition);
Thread t1 = new Thread(conditionWait);
Thread t2 = new Thread(signal);
t1.start();
t2.start();
}
}
题目:多线程之间按顺序调用,实现A→B→C三个线程启动,要求如下:
AA打印5次,BB打印10次,CC打印15次
紧接着
AA打印5次,BB打印10次,CC打印15次
....
打印10轮
// 判断 干活 通知
class ShareSource {
private int number = 1;//1:A 2:B 3:C
private Lock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
public void print5(){
lock.lock();
try {
while(number!=1){
c1.await();
}
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
number=2;
c2.signal();
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10(){
lock.lock();
try {
while(number!=2){
c2.await();
}
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
number=3;
c3.signal();
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15(){
lock.lock();
try {
while(number!=3){
c3.await();
}
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName()+" "+i);
}
number=1;
c1.signal();
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ConditionLockDemo {
public static void main(String[] args) {
ShareSource shareSource = new ShareSource();
new Thread(()->{
for (int i = 0; i < 5; i++) {
shareSource.print5();
}
},"AA").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
shareSource.print10();
}
},"BB").start();
new Thread(()->{
for (int i = 0; i < 5; i++) {
shareSource.print15();
}
},"CC").start();
}
}
public class BatchCondition {
private Map<String, ReentrantLock> map = new HashMap<>();
private Map<String, BlockingQueue<Condition>> condidtions = new HashMap<>();
private int count = 1;
{
init();
}
/**
* 初始化锁
*/
public void init(){
map.put("lock1",new ReentrantLock());
}
public void task(String key){
ReentrantLock lock = map.get(key);
try {
lock.lock();
while (!condidtions.isEmpty()){
Condition condition = lock.newCondition();
condidtions.get(key).add(condition);
condition.await();
}
count++;
} catch (Exception ioe) {
ioe.printStackTrace();
} finally {
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+" "+count);
//解锁
BlockingQueue<Condition> conditionQueue = condidtions.get(key);
if(conditionQueue!=null&&!conditionQueue.isEmpty()){
conditionQueue.poll().signal();
}
System.out.println("解锁成功");
}
public static void main(String[] args) {
BatchCondition batchCondition = new BatchCondition();
new Thread(()->{
batchCondition.task("lock1");
},"AA").start();
new Thread(()->{
batchCondition.task("lock1");
},"BB").start();
}
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
Node node = addConditionWaiter();
//释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
long savedState = fullyRelease(node);
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
LockSupport.park(this);// 第一次总是 park 自己,开始阻塞等待
// 线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会isOnSyncQueue 中判断自己是否在队列上.
// isOnSyncQueue 判断当前 node 状态,如果是 CONDITION 状态,或者不在队列上了就继续阻塞.
// isOnSyncQueue 判断当前 node 还在队列上且不是 CONDITION 状态了,就结束循环和阻塞.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
// interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
// 将这个变量设置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
// 如果是 null ,就没有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果线程被中断了,需要抛出异常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
//如 果 lastWaiter 不 等 于 空 并 且waitStatus 不等于 CONDITION 时,把冲好
这个节点从链表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建一个 Node,waitStatus=CONDITION。这里的链表是一个单向的,所以相比
AQS 来说会简单很多
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
//重入次数
if (release(savedState)) {
释放锁并且唤醒下一个同步队列中的线程
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
public final void signal() {
if (!isHeldExclusively())//先判断当前线程是否获得了锁,直接用获得锁的线程和当前线程相比即可
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
if (first != null)
doSignal(first);
}
1.8公平锁和非公平锁
公平锁和非公平锁
公平锁:是指多个线程按照申请的顺序获取锁,
类似排队打饭,先来后到
非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比现申请的线程优先获取在高并发情况下,有可能造成优先级反转或者饥饿现象
区别:
公平锁/非公平锁
并发包中ReentrantLock创建可以指定构造函数的boolean类型来得到公平锁或非公平锁,默认非公平锁
关于两者区别:
公平锁:就是很公平,在并发情况下,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等等队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照FIFO的规则从队列中取到自己。
非公平锁:比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就再采用类似公平锁那种方式
java ReenTrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的有点在与吞吐量比公平锁打。
对于synchronize而言,也是一种非公平锁
1.9自旋锁
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处就是减少线程上线文切换的消耗,缺点是循环会消耗cpu
/**
* 好处:循环比较获取直到成功为止,没有类似wait的阻塞
*
* 通过CAS操作完成自旋锁,A线程先进来调用mylock方法自己持有锁5s钟,B随后进来后发现当前有线程
* 有锁,不是null,所以只能通过自旋等待锁,直到A释放锁后B随后抢到
*/
public class SpinLockDemo {
//原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+" common on");
while (!atomicReference.compareAndSet(null,thread)){
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+" common on myUnLock");
while (!atomicReference.compareAndSet(thread,null)){
}
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(()->{
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnLock();
},"AA").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnLock();
},"BB").start();
}
}
1.10synchronized和lock的区别
1.原始构成
synchronized是关键字属于JVM层面,java关键字,monitorenter(底层是通过monitor对象来完成,其实wait/notify等方法也依赖于monitor对象只有在同步块或方法中才能凋 wait/notify 等方法monitorexit)
lock是具体类(java.util.concurrent.locks.lock)是api层面的锁
2.使用方法
synchronized不需要用户去手动释放锁,当synchronize代码执行完后系统会自动让线程释放锁对锁的占用
ReentrantLock则需要用户手动释放锁若没有主动释放锁就有可能导致出现死锁现象。
需要lock()和unlock方法配合try/finally语句块来完成。
3.等待是否中断
synchronized:不可中断,除非抛出异常或者正常运行完成
ReentrantLock可中断,1设置超时时间,tryLock(long timeout,TimeUnit nuit)
4.lockInteeruptibly()放代码块中,调用interrupt()方法可中断
4.加锁是否公平
1.synchronizded是非公平锁
2.ReentrantLock 两者都可以是,默认为非公平,构造方法可以传入boolean值,true为公平锁,false为非公平锁
5.锁绑定多个条件Condition
sysnchronizde没有
ReentranLock用实现分组唤醒需要唤醒的线程们,可以精确的唤醒,而不是像synchronied要么随机唤醒一个线程要么唤醒全部线程。