zoukankan      html  css  js  c++  java
  • 学习JUC源码(2)——自定义同步组件

    前言

      在之前的博文(学习JUC源码(1)——AQS同步队列(源码分析结合图文理解))中,已经介绍了AQS同步队列的相关原理与概念,这里为了再加深理解ReentranLock等源码,模仿构造同步组件的基本模式,编写不可重复的互斥锁Mutex与指定共享线程数量的共享锁。MySharedLock。

      主要参考资料《Java并发编程艺术》(有需要的小伙伴可以找我,我这里只有电子PDF)同时结合ReentranLock、AQS等源码。


    一、构造同步组件的模式

    丛概念方层面,在中,我们知道锁与同步器的相关概念:

    • 同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义;
    • 锁是面向使用者的,提供锁交互的实现;
    • 同步器是面向锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程排队、等待/唤醒等底层操作。

    从代码层面,同步器是基于模板模式实现的,可以通过可重写的方法中的随便一个窥探:

      /**
         * 模板方法:
         *  protected关键字
         *  没有任何实现
         * @param arg
         * @return
         */
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }

    也就是需要进行以下几步:

    1)继承同步器重写指定方法(idea中extends AQS点击快捷键ctrl+O即可显示)

    • tryAcquire(int arg):独占式获取同步状态;
    • tryRelease(int arg):独占式释放同步状态;
    • tryAcquireShared(int arg):共享式获取同步状态,返回大于0的值表示获取成功,否则失败
    • tryReleaseShared(int arg):共享式释放锁
    • isHeldExclusively():当前线程是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用

    2)随后将同步器组合在自定义同步组件的实现中,即定义内部类Syn继承AQS,在Syn中重写AQS方法:

    public class Sync extends AbstractQueuedSynchronizer{
            @Override
            protected boolean tryAcquire(int arg) {
                final Thread current = Thread.currentThread();
                if (compareAndSetState(0, 1)) {
                    // 获取成功之后,当前线程是该锁的持有者,不需要再可重入数
                    setExclusiveOwnerThread(current);
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                if (getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively() {
                  return getState() == 1;
            }
            // 返回Condition,每个Condition都包含了一个队列
            Condition newCondition() {
                return new ConditionObject();
            }
        }

    3)最后调用同步器提供的模板方法,即同步组件类实现Lock方法之后,在lock/unlock方法中调用内部类Syn的方法acquire(int arg)等方法

    public class Mutex implements Lock {
        
       ........
        @Override
        public void lock() {
            sync.acquire(1);
        }
        @Override
        public void unlock() {
            sync.release(1);
        }
        ........
    
    }

    具体请看下面的实践部分

    二、互斥不可重入锁

     在我之前写过的博文中(详解Java锁的升级与对比(1)——锁的分类与细节(结合部分源码))介绍可重入锁与不可重入锁的区别时,就写到JUC中没有不可重入锁的具体实现,但是可以类比,现在呢,我们可以做到实现了,具体看下面代码,模式完全符合依赖Lock与AQS构造同步组件模式。

    (1)Mutex代码实现(核心关键实现已经在代码中注释)

    public class Mutex implements Lock {
    
        private final Sync sync = new Sync();
        public class Sync extends AbstractQueuedSynchronizer{
            @Override
            protected boolean tryAcquire(int arg) {
                final Thread current = Thread.currentThread();
                if (compareAndSetState(0, 1)) {
                    // 获取成功之后,当前线程是该锁的持有者,不需要再可重入数
                    setExclusiveOwnerThread(current);
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                if (getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively() {
                  return getState() == 1;
            }
            // 返回Condition,每个Condition都包含了一个队列
            Condition newCondition() {
                return new ConditionObject();
            }
        }
    
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock() {
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
    
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

    其中核心代码就是重写的两个方法:

    • tryAcquire(int arg)方法:主要是设置同独占式更新同步状态,CAS实现state+1
    • tryRelease(int arg)方法:独占式释放同步状态,释放锁持有 

    (2)测试Demo

    public class MutexDemo {
    
        @Test
        public void test(){
            final Mutex lock = new Mutex();
            class Worker extends Thread {
                @Override
                public void run() {
                    // 一直不停在获取锁
                    while (true) {
                        lock.lock();
                        try {
                            System.out.println(Thread.currentThread().getName() +" hold lock, "+new Date());
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            lock.unlock();
                            System.out.println(Thread.currentThread().getName() +" release lock, "+new Date());
                        }
                    }
                }
    
            }
            for (int i = 0; i < 10; i++) {
                Worker worker = new Worker();
                // 以守护进程运行,VM退出不影响运行,这里只是为了一个打印效果,去掉注释一直打印
                worker.setDaemon(true);
                worker.start();
            }
            // 每隔一秒换行
            for (int j = 0; j < 10; j++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println();
            }
        }
    }

    (3)运行结果

    Thread-0 hold lock, Tue Dec 08 16:26:42 CST 2020
    
    Thread-0 release lock, Tue Dec 08 16:26:43 CST 2020
    Thread-1 hold lock, Tue Dec 08 16:26:43 CST 2020
    
    Thread-2 hold lock, Tue Dec 08 16:26:44 CST 2020
    Thread-1 release lock, Tue Dec 08 16:26:44 CST 2020
    
    Thread-3 hold lock, Tue Dec 08 16:26:45 CST 2020
    Thread-2 release lock, Tue Dec 08 16:26:45 CST 2020
    
    Thread-3 release lock, Tue Dec 08 16:26:46 CST 2020
    Thread-4 hold lock, Tue Dec 08 16:26:46 CST 2020
    
    Thread-4 release lock, Tue Dec 08 16:26:47 CST 2020
    Thread-6 hold lock, Tue Dec 08 16:26:47 CST 2020
    
    Thread-7 hold lock, Tue Dec 08 16:26:48 CST 2020
    Thread-6 release lock, Tue Dec 08 16:26:48 CST 2020
    
    Thread-7 release lock, Tue Dec 08 16:26:49 CST 2020
    Thread-5 hold lock, Tue Dec 08 16:26:49 CST 2020
    
    Thread-8 hold lock, Tue Dec 08 16:26:50 CST 2020
    Thread-5 release lock, Tue Dec 08 16:26:50 CST 2020
    
    Thread-8 release lock, Tue Dec 08 16:26:51 CST 2020
    Thread-9 hold lock, Tue Dec 08 16:26:51 CST 2020

    (4)结果分析

    互斥锁的核心就是同一个同步状态只能被一个线程持有,其它线程等待持有线程释放才能竞争获取。截图一开始的运行结果分析:

    Thread-0 hold lock, Tue Dec 08 16:26:42 CST 2020
    
    Thread-0 release lock, Tue Dec 08 16:26:43 CST 2020
    Thread-1 hold lock, Tue Dec 08 16:26:43 CST 2020
    
    Thread-2 hold lock, Tue Dec 08 16:26:44 CST 2020
    Thread-1 release lock, Tue Dec 08 16:26:44 CST 2020

    10个线程不断竞争锁,一开始Thread-0在08 16:26:42获取到锁,持有锁1秒后在释放16:26:43时释放,同时Thread-1立马获取到锁,1秒后于16:26:44释放锁,同时Thread-2立马获取到了锁......

    根据输出结果来说,完全符合Mutex作为互斥锁这个功能:同一时刻只有一个线程持有锁(同步状态),其它线程等待释放后才能获取

    三、指定共享线程数目的共享锁

    (1)代码实现(核心关键实现已经在代码中注释)

    public class MyShareLock implements Lock {
    
        // 可以看到共享等待队列中的线程
        public Collection<Thread> getSharedQueuedThreads(){
            return syn.getSharedQueuedThreads();
        }
        private final Syn syn = new Syn(2);
    
        private static final class Syn extends AbstractQueuedSynchronizer{
            int newShareCount=0;
            Syn(int shareCount){
                if (shareCount <= 0) {
                    throw new IllegalArgumentException("share count must large than zero");
                }
                // 设置初始共享同步状态
                setState(shareCount);
            }
    
            /**
             * 共享锁指定数目
             * @param reduceShareCount
             * @return
             */
            @Override
            protected int tryAcquireShared(int reduceShareCount) {
    
                for (;;){
                    int currentShareCount = getState();
                    newShareCount = currentShareCount- reduceShareCount;
                    if (newShareCount < 0 ||
                            compareAndSetState(currentShareCount,newShareCount)) {
                        // newShareCount大于等于0才说明获取锁成功
                        if (newShareCount >= 0) {
    //                        System.out.println(Thread.currentThread().getName()+" hold lock, current share count is "+newShareCount+", "+new Date());
                        }
                        // newShareCount小于0表示获取失败所以需要返回
                        // compareAndSetState(currentShareCount,newShareCount)为true自然表示成功需要返回
                        return newShareCount;
                    }
                }
            }
    
            @Override
            protected boolean tryReleaseShared(int returnShareCount) {
                for (;;){
                    int currentShareCount = getState();
                    newShareCount = currentShareCount + returnShareCount;
                    if (compareAndSetState(currentShareCount,newShareCount)) {
    //                    System.out.println(Thread.currentThread().getName() +" release lock, current share count is "+newShareCount+", "+new Date());
                        return true;
                    }
                }
            }
            protected int getShareCount(){
                return getState();
            }
        }
    
        /**
         * 调用内部同步器Syn的acquireShare方法
         */
        @Override
        public void lock() {
            syn.acquireShared(1);
        }
        /**
         * 调用内部同步器Syn的releaseShared方法
         */
        @Override
        public void unlock() {
            syn.releaseShared(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            if (Thread.interrupted()) {
                throw new IllegalStateException();
            }
            syn.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }

    (2)测试Demo

    public class ShareLockTest {
    
        @Test
        public void test(){
            final MyShareLock lock = new MyShareLock();
            class Worker extends Thread {
                @Override
                public void run() {
                    // 一直不停在获取锁
                while (true) {
                    lock.lock();
                    try {
                        System.out.println(Thread.currentThread().getName() +" hold lock, "+new Date());
    //                    System.out.println(lock.getSharedQueuedThreads());
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                        System.out.println(Thread.currentThread().getName() +" release lock, "+new Date());
                    }
                }
                }
    
            }
            for (int i = 0; i < 10; i++) {
                Worker worker = new Worker();
                // 以守护进程运行,VM退出不影响运行,这里只是为了一个打印效果,去掉注释一直打印
                worker.setDaemon(true);
                worker.start();
            }
            // 每隔一秒换行
            for (int j = 0; j < 10; j++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println();
            }
        }
    }

    (3)运行结果(结果可能不同)

    Thread-1 hold lock, Tue Dec 08 16:36:05 CST 2020
    Thread-0 hold lock, Tue Dec 08 16:36:05 CST 2020
    
    Thread-0 release lock, Tue Dec 08 16:36:06 CST 2020
    Thread-4 hold lock, Tue Dec 08 16:36:06 CST 2020
    Thread-1 release lock, Tue Dec 08 16:36:06 CST 2020
    Thread-2 hold lock, Tue Dec 08 16:36:06 CST 2020
    
    Thread-4 release lock, Tue Dec 08 16:36:07 CST 2020
    Thread-2 release lock, Tue Dec 08 16:36:07 CST 2020
    Thread-5 hold lock, Tue Dec 08 16:36:07 CST 2020
    Thread-8 hold lock, Tue Dec 08 16:36:07 CST 2020
    
    Thread-8 release lock, Tue Dec 08 16:36:08 CST 2020
    Thread-3 hold lock, Tue Dec 08 16:36:08 CST 2020
    Thread-9 hold lock, Tue Dec 08 16:36:08 CST 2020
    Thread-5 release lock, Tue Dec 08 16:36:08 CST 2020
    
    Thread-6 hold lock, Tue Dec 08 16:36:09 CST 2020
    Thread-7 hold lock, Tue Dec 08 16:36:09 CST 2020
    Thread-3 release lock, Tue Dec 08 16:36:09 CST 2020
    Thread-9 release lock, Tue Dec 08 16:36:09 CST 2020
    
    Thread-6 release lock, Tue Dec 08 16:36:10 CST 2020
    Thread-1 hold lock, Tue Dec 08 16:36:10 CST 2020
    Thread-0 hold lock, Tue Dec 08 16:36:10 CST 2020
    Thread-7 release lock, Tue Dec 08 16:36:10 CST 2020
    
    Thread-1 release lock, Tue Dec 08 16:36:11 CST 2020
    Thread-2 hold lock, Tue Dec 08 16:36:11 CST 2020
    Thread-0 release lock, Tue Dec 08 16:36:11 CST 2020
    Thread-4 hold lock, Tue Dec 08 16:36:11 CST 2020
    
    Thread-2 release lock, Tue Dec 08 16:36:12 CST 2020
    Thread-8 hold lock, Tue Dec 08 16:36:12 CST 2020
    Thread-5 hold lock, Tue Dec 08 16:36:12 CST 2020
    Thread-4 release lock, Tue Dec 08 16:36:12 CST 2020
    
    Thread-5 release lock, Tue Dec 08 16:36:13 CST 2020
    Thread-9 hold lock, Tue Dec 08 16:36:13 CST 2020
    Thread-3 hold lock, Tue Dec 08 16:36:13 CST 2020
    Thread-8 release lock, Tue Dec 08 16:36:13 CST 2020
    
    Thread-3 release lock, Tue Dec 08 16:36:14 CST 2020
    Thread-7 hold lock, Tue Dec 08 16:36:14 CST 2020
    Thread-9 release lock, Tue Dec 08 16:36:14 CST 2020
    Thread-6 hold lock, Tue Dec 08 16:36:14 CST 2020

    (4)结果分析

    该指定共享线程数量N的共享锁的最终目的就是多个线程可以持有锁(同步状态),达到共享线程数量N(代码中默认为2)时,其它线程将进入Queue等待获取同步结果,同一时刻只能最多有N个线程持有锁

    同样地,我们分析开头运行结果:

    Thread-1 hold lock, Tue Dec 08 16:36:05 CST 2020
    Thread-0 hold lock, Tue Dec 08 16:36:05 CST 2020
    
    Thread-0 release lock, Tue Dec 08 16:36:06 CST 2020
    Thread-4 hold lock, Tue Dec 08 16:36:06 CST 2020
    Thread-1 release lock, Tue Dec 08 16:36:06 CST 2020
    Thread-2 hold lock, Tue Dec 08 16:36:06 CST 2020

    10个线程不停竞争锁,一开始Thread-0与Thread-1在16:36:05时刻同时获取到了锁,此时已经达到共享数量的最大值,即N,之后持有锁1秒,Thread-0与Thread-1在16:36:06时刻立马释放锁,同时Thread-4与Thread-2立马退出等待队列立马竞争持有锁。

    从结果来看,完全是符合ShareLock共享锁功能的:同一时刻最多允许N个线程持有锁,其它线程等待持有线程释放锁

  • 相关阅读:
    大数据学习——实现多agent的串联,收集数据到HDFS中
    大数据学习——flume拦截器
    Notepad++ 连接远程 FTP
    大数据学习——采集文件到HDFS
    How to Catch Ctrl-C in Shell Script
    Error: package or namespace load failed for ‘rJava’:
    Complex Instance Placement
    Linux 下 CPU 使用率与机器负载的关系与区别
    编码和加密算法介绍
    k8s sidecar, Ambassador, Adapter containers
  • 原文地址:https://www.cnblogs.com/jian0110/p/14102810.html
Copyright © 2011-2022 走看看