zoukankan      html  css  js  c++  java
  • Java并发编程04-J.U.C包

    J.U.C 是 java.util.concurrent 的缩写,是 jdk 的并发包,包含了很多并发相关的类。下面介绍常用的类。

    一、Atomic 原子操作类

    1. 原子更新基本类型

    使用原子的方式更新基本类型,Atomic 包提供了以下 3 个类:

    • AtomicBoolean
    • AtomicInteger
    • AtomicLong

    以上类的基本使用方法都差不多,下面以 AtomicInteger 为例子说明。AtomicInteger 常用方法如下:

    • int addAndGet(int delta) : 以原子方式将输入值与实例中的值相加,并返回结果;
    • int getAndAdd(int delta)
    • int incrementAndGet()
    • boolean compareAndSet(int expect, int update) : 如果输入值等于预期值,则以原子的方式将该值设置为输入的值;
    • ...

    2. 原子更新数组

    通过原子的方式更新数组里的某个元素,Atomic 包提供了以下 3 个类:

    • AtomicIntegerArray
    • AtomicLongArray
    • AtomicRefrenceArray : 原子更新引用类型数组里的元素

    AtomicIntegerArray 常用方法:

    • int addAndGet(int i, int delta)
    • boolean compareAndSet(int i, int expect, int update)

    3. 原子更新引用类型

    原子更新基本类型的 AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用原子更新引用类型提供的类。Atomic 包提供了以下 3 个类:

    • AtomicReference: 原子更新引用类型
    • AtomicReferenceFieldUpdater: 原子更新引用类型里的字段
    • AtomicMarkableReference: 原子更新带有标记位的引用类型。

    AtomicReference 程序示例:

    public class N00_Test {
        public static AtomicReference<User> atomicUserRef = new AtomicReference();
    
        public static void main(String[] args) {
            User user = new User("zeno", 18);
            atomicUserRef.set(user);
    
            User updateUser = new User("joker", 22);
            atomicUserRef.compareAndSet(user, updateUser);
    
            System.out.println(atomicUserRef.get().getName());
            System.out.println(atomicUserRef.get().getAge());
        }
    
        static class User {
            private String name;
            private int age;
    
            public User(String name, int age) {
                this.name = name;
                this.age = age;
            }
    
            public String getName() {
                return name;
            }
    
            public int getAge() {
                return age;
            }
        }
    }
    

    4. 原子更新字段类

    如果需要原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic 类提供了以下 3 个类进行原子字段更新。

    • AtomicIntegerFieldUpdater : 原子更新整形字段的更新器
    • AtomicLongFieldUpdater : 原子更新长整形字段的更新器
    • AtomicStampedRefrence : 原子更新带有版本号的引用类型 ,可以解决 CAS 原子更新的 ABA 问题。

    AtomicIntegerFieldUpdater 程序示例:

    public class N00_Test {
        private static AtomicIntegerFieldUpdater<User> a
                = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
    
        public static void main(String[] args) {
            User user = new User("zeno", 18);
            System.out.println(a.getAndIncrement(user));
            System.out.println(a.get(user));
        }
    
        static class User {
            private String name;
            // 注意被更新的字段,必须是 public、volatile
            public volatile int age;
    
            public User(String name, int age) {
                this.name = name;
                this.age = age;
            }
    
            public String getName() {
                return name;
            }
    
            public int getAge() {
                return age;
            }
        }
    }
    

    运行结果:

    18
    19
    

    二、AQS 队列同步器

    1. 介绍

    AQS 是 java.util.concurrent.locks 下类 AbstractQueuedSynchronizer 的简称,是用于通过 Java 源码来构建多线程的锁和同步器的一系列框架,用于 Java 多线程之间的同步。

    2. 原理

    在 AQS 类中维护了一个使用双向链表 Node 实现的 FIFO 队列,用于保存等待的线程,同时利用一个 int 类型的 state 来表示状态,使用时通过继承 AQS 类并实现它的 acquire 和 release 方法来操作状态,来实现线程的同步。

    以 ReentrantLock 为例,state 初始化为 0,表示未锁定状态。A 线程 lock() 时,会调用 tryAcquire() 独占该锁并将 state+1。此后,其他线程再 tryAcquire() 时就会失败,直到 A 线程 unlock() 到 state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A 线程自己是可以重复获取此锁的(state 会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证 state 是能回到零态的。

    再以 CountDownLatch 以例,任务分为 N 个子线程去执行,state 也初始化为 N(注意 N 要与线程个数一致)。这 N 个子线程是并行执行的,每个子线程执行完后 countDown() 一次,state 会 CAS (Compare and Swap) 减 1。等到所有子线程都执行完后 (即 state=0),会 unpark() 主调用线程,然后主调用线程就会从 await() 函数返回,继续后余动作。

    3. 架构与源码详解

    参考: Java并发之AQS详解

    三、Lock

    1. ReentrantLock

    重入锁,即支持同一个线程对资源进行重复加锁(synchronized 也是可重入的)。此外,该锁还支持获取锁时的公平和非公平选择。

    ReentrantLock 与 synchronized 对比

    • ReentrantLock 需要显示地获取和释放锁,繁琐,代码更灵活(比如创建多个锁,获取一个锁的时候释放另一把锁)
    • synchronized 是依赖于 JVM 实现的,而 ReentrantLock 是 JDK 实现的
    • ReentrantLock 更强大:
      • ReentrantLock 可以指定是公平锁还是非公平锁,而 synchronized 只能是非公平锁,所谓的公平锁就是先等待的线程先获得锁
      • ReentrantLock 提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程
      • ReentrantLock 提供了一种能够中断等待锁的线程的机制,通过 lock.lockInterruptibly() 来实现这个机制

    我们控制线程同步的时候,优先考虑synchronized,如果有特殊需要,再进一步优化。ReentrantLock如果用的不好,不仅不能提高性能,还可能带来灾难。

    原理:

    // 以公平锁为例,从lock.lock()开始研究
    final void lock() { acquire(1);}
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 首先通过公平或者非公平方式尝试获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 然后构建一个Node放入队列中并等待执行的时机
            selfInterrupt();
    }
    
    // 公平锁设置锁执行状态的逻辑
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) { //如果state是0,就是当前的锁没有人占有
            if (!hasQueuedPredecessors() && // 公平锁的核心逻辑,判断队列是否有排在前面的线程在等待锁,非公平锁就没这个条件判断
                compareAndSetState(0, acquires)) { // 如果队列没有前面的线程,使用CAS的方式修改state
                setExclusiveOwnerThread(current); // 将线程记录为独占锁的线程
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) { // 因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    // 接下来要执行的acquireQueued如下
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) { // 再次使用公平锁逻辑判断是否将Node作为头结点立即执行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    2. ReentrantReadWriteLock

    可重入的读写锁,定义了读锁和写锁两个方法。用于需要同步资源时在前后加锁/解锁,当一个线程获取读锁后其他线程可以继续获取读锁,当一个线程获取写锁后其他线程都需等待。

    通过使用读写锁,可以提高读操作的并发性,也保证了每次写操作对于所有的读写操作的可见性。

    读写锁程序示例:

    public class N15_ReentrantReadWriteLock {
        private Map map = new TreeMap<>();
    
        private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private final Lock readLock = readWriteLock.readLock();
        private final Lock writeLock = readWriteLock.writeLock();
    
        public Object get(String key) {
            readLock.lock();
            try {
                return map.get(key);
            } finally {
                readLock.unlock();
            }
        }
    
        public Set getAllkeys() {
            readLock.lock();
            try {
                return map.keySet();
            } finally {
                readLock.unlock();
            }
        }
    
        public Object put(String key, Object vlaue) {
            writeLock.lock();
            try {
                return map.put(key, vlaue);
            } finally {
                writeLock.unlock();
            }
        }
    }
    

    3. Condition

    Condition 接口提供了类似于 Object 的监视器方法,与 Lock 配合使用,可以实现等待/通知模式。

    程序示例:

    public class N19_Condition {
        public static void main(String[] args) {
            ReentrantLock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " run");
                    System.out.println(Thread.currentThread().getName() + " wait for condition");
                    try {
                        // 1.将线程1放入到Condition队列中等待被唤醒,且立即释放锁
                        condition.await();
                        // 3.线程2执行完毕释放锁,此时线程1已经在AQS等待队列中,则立即执行
                        System.out.println(Thread.currentThread().getName() + " continue");
                    } catch (InterruptedException e) {
                        System.err.println(Thread.currentThread().getName() + " interrupted");
                        Thread.currentThread().interrupt();
                    }
                } finally {
                    lock.unlock();
                }
            }).start();
    
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println(Thread.currentThread().getName() + " run");
                    System.out.println(Thread.currentThread().getName() + " sleep 1 secs");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.err.println(Thread.currentThread().getName() + " interrupted");
                        Thread.currentThread().interrupt();
                    }
                    // 2.线程2获得锁,signalAll将Condition中的等待队列全部取出并加入到AQS中
                    condition.signalAll();
                } finally {
                    lock.unlock();
                }
            }).start();
        }
    }
    

    运行结果:

    Thread-0 run
    Thread-0 wait for condition
    Thread-1 run
    Thread-1 sleep 1 secs
    Thread-0 continue
    

    4. LockSupport

    参考:LockSupport的用法及原理

    四、并发工具类

    1. CountDownLatch

    主要用于等待线程等待其他线程执行后再执行,其实现是通过控制计数器是否递减到0来判别,其他的每一个线程执行完毕后,调用countDown()方法让计数器减一,等待线程调用await()方法,直到计数器为1在执行。

    程序示例:

    public class N12_CountDownLatch {
        private static int threadCount = 200;
    
        /**
         * demo 主线程等待200个线程执行完毕后再执行:
         */
        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newCachedThreadPool();
    
            CountDownLatch countDownLatch = new CountDownLatch(threadCount);
            for (int i = 0; i < threadCount; ++i) {
                int finalI = i;
                pool.execute(() -> {
                    try {
                        Thread.sleep(100);
                        System.out.println(Thread.currentThread().getName() + " " + finalI);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
    
            countDownLatch.await();
            pool.shutdown();
        }
    }
    

    2. CyclicBarrier

    用于等待多个线程都准备好再进行,每一个线程准备好后,计数器加1,加到指定值后全部开始

    程序示例:

    public class N13_CyclicBarrier {
    
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    
        /**
         * demo 一个20个线程每等待5个线程进行一次:
         */
        public static void main(String[] args) throws InterruptedException {
            ExecutorService pool = Executors.newCachedThreadPool();
    
            for (int i = 0; i < 20; ++i) {
                Thread.sleep(1000);
                pool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " is ready...");
                    try {
                        //等待指定数量的其他线程执行 无参一直等待不抛异常 有参数表示等待指定时间若数量还未等到抛出异常
                        cyclicBarrier.await();
    //                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
                    } catch (BrokenBarrierException | InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + " is continue...");
                });
            }
    
            pool.shutdown();
        }
    }
    

    3. Semaphore

    英译信号量,用于控制某个资源同时可被访问的个数,如控制数据库资源可以同时并发数量为20

    public class N14_Semaphore {
        private static int threadCount = 200;
    
        public static void main(String[] args) {
            ExecutorService pool = Executors.newCachedThreadPool();
    
            //定义允许并发的信号量m
            Semaphore semaphore = new Semaphore(20);
    
            for (int i = 0; i < threadCount; ++i) {
                //该线程的最大并发数为m/n
                int finalI = i;
                pool.execute(() -> {
                    try {
                        //获取n个信号量 无参为一个
                        semaphore.acquire(4);
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getName() + " " + finalI);
                        //释放n个信号量 无参为一个
                        semaphore.release(4);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            pool.shutdown();
        }
    }
    

    五、并发容器类

    1. ConcurrentHashMap

    2. BlockingQueue

    ...

    六、Fork/Join 框架

    Fork/Join 框架是 JDK7 中出现的一款高效的工具,Java 开发人员可以通过它充分利用现代服务器上的多处理器。它是专门为了那些可以递归划分成许多子模块设计的,目的是将所有可用的处理能力用来提升程序的性能。Fork/Join 框架一个巨大的优势是它使用了工作窃取算法,可以完成更多任务的工作线程可以从其它线程中窃取任务来执行。

    七、线程池

    参考:

  • 相关阅读:
    codeforces 189A
    hdu 2085
    hdu 2083
    cf 1237 C2. Balanced Removals (Harder)
    cf 1244 D. Paint the Tree
    cf 1241 E. Paint the Tree(DP)
    cf 1241 D. Sequence Sorting(思维)
    cf1228 D Complete Tripartite(哈希)
    Windows10 与 WSL(Ubuntu)的文件互访
    Ubuntu下运行python文件
  • 原文地址:https://www.cnblogs.com/cloudflow/p/13894323.html
Copyright © 2011-2022 走看看