zoukankan      html  css  js  c++  java
  • Java 线程 — AbstractQueuedSynchronizer

    锁就是一种状态,比如互斥锁:同一时间只能有一个线程拥有,可以使用一个整型值来标志当前的状态

    • 0:表示没有现成占有锁
    • 1:表示锁已经被占用

    AbstractQueuedSynchronizer

    • 实现Java中锁的基础,提供了一系列模板方法,程序员可以继承该类作为内部类实现自定义同步语义。
    • 通过一个整型变量state维护锁的同步状态
    • 通过CAS保证同步状态state的修改是原子的

    这个抽象类中使用了很多模板方法,在实现锁机制的时候可以调用这些模板方法:

    模板方法里面调用了一些尚未实现、留给程序员实现的抽象方法:
    独占式同步方法:

    共享式同步方法:

    独占式同步状态

    同时只能有一个线程占有锁,独占式同步过程:

    // 获取锁,可以在在lock方法中调用
    public final void acquire(int arg) {
    	// tryAcquire方法需要根据锁的功能自行实现
        // 获取锁失败的话执行addWaiter,Node.EXCLUSIVE表明当前node获取的是独占式的锁
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // 因为获取锁失败,将当前node加入queue的最后一个节点tail
    private Node addWaiter(Node mode) {
    	// 新建的node是独占模式,不对waitStatus赋值,也就是默认为0
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果队列不为空,则插入tail
        if (pred != null) {
            node.prev = pred;
            // 因为是独占式的锁,只会有一个线程修改tail,不要循环
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果tail == null表示队列为空,调用enq入队
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 因为在这个时候可能其他线程已经入队,队列可能不再为null
            // 如果依然为空,则初始化队列——新建一个node,并设为head=tail=newNode
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	// 如果其他线程已经入过队,则正常设置tail
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    // 入队列之后进入自旋状态
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 不断循环尝试获取锁——自旋
            for (;;) {
                final Node p = node.predecessor();	// 获取前一个node
                // 如果前一个node是head,则尝试获取锁,因为如果前一个是head说明head有可能已经释放锁,当前node可以尝试获取锁
                if (p == head && tryAcquire(arg)) {
                	// 如果获取成功则将当前node设为head
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 判断是否需要park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 这个方法可以结合释放锁release(释放独占锁)和releaseShared(释放共享锁)来看,因为释放锁的时候会判断是否进行unpark操作
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             // 前面一个节点是signal,表示前面一个节点释放锁的时候(release)一定会unpark,那么当前node可以park(因为一定会被unpark,保证不会被永久阻塞)
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             // 把已经是cancelled的node从队列中移除,不再进行获取锁
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    释放独占锁:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 独占锁的node.waitStatus默认是0,因为在初始化的时候不会对waitStatus赋值,如果不等于0,表示在加入队列的时候设置过waitStatus,设置为SIGNAL
            if (h != null && h.waitStatus != 0)
            	// 如果head的waitStatus是SIGNAL表示要唤醒阻塞的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    共享式同步状态

    可以同时有多个线程获得锁

    重入锁

    同一个线程重复获得锁

    读写锁

    读锁:共享锁
    写锁:独占锁

    LockSupport

    调用Unsafe的方法,负责阻塞或者唤醒线程。相当于一个线程有一个许可,默认是被占用的,park会占用这个许可,unpark会分配这个许可,只要调用过至少一次unpark,那么再调用一次park线程会返回,不会被永久阻塞:

    • 如果先调用park并且后面不会调用unpark,那么线程会被一直阻塞;
    • 如果先调用unpark(至少一次),再调用park,那么线程立即返回,不会阻塞
    • 如果先调用park(至少一次),再调用unpark,那么线程在调用unpark后返回,不会继续阻塞

    所以unpark会给当前线程分配一个许可,如果之前调用过一次park或者后面调用一次park,那么线程不再继续阻塞,即:

    • 一个线程只有一个许可
    • unpark分配许可
    • park占用许可
    • 只有调用unpark的次数 >= 调用park的次数,线程才不会被永久阻塞

    Condition

    任意一个Java对象,都拥有一组Monitor方法(定义在Object上),包括:wait,notify等,与synchronized配合实现等待/通知模式
    Lock和Condition结合也可以实现等待/通知模式

  • 相关阅读:
    typeof的用法
    新建一个express工程,node app无反应
    搜索引擎-倒排索引基础知识
    搭建Hadoop2.6.0+Eclipse开发调试环境(以及log4j.properties的配置)
    hadoop下远程调试方法
    回调函数透彻理解Java
    Maven 手动添加 JAR 包到本地仓库
    Hbase rowkey热点问题
    Hadoop 2.2 & HBase 0.96 Maven 依赖总结
    通过Java Api与HBase交互
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/6049448.html
Copyright © 2011-2022 走看看