zoukankan      html  css  js  c++  java
  • Java并发-并发工具类JUC

    安全共享对象策略

    1.线程限制 : 一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
    2.共享只读 : 一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,
    但是任何线程都不能修改它
    3.线程安全对象 : 一个线程安全的对象或则容器,在内部通过同步机制来保证线程安全,
    所以其他线程无需额外的同步就可以通过公共接口随意访问它
    4.被守护对象 : 被守护对象只能通过获取特定的锁来访问

    线程安全 - 同步容器

    采用synchronized关键字同步,缺点 :

    1. 不能完成做到线程安全
    2. 性能差

    ArrayLisy -> Vector, Stack
    HashMap -> HashTable (key、value不能为null)
    Collections.synchronizedXXX(List、Set、Map)

    线程安全 - 并发容器 J.U.C

    ArrayList -> CopyOnWriteArrayList
    HashSet、TreeSet -> CopyOnWriteArraySet ConcurrentSkipListSet
    HashMap、TreeMap -> ConcurrentHashMap ConcurrentSkipListMap

    AbstractQueuedSynchronizer - AQS

    1. 使用Node实现FIFO队列,可以用于构建锁或则其他同步装置的基础框架
    2. 利用一个int类型表示状态
    3. 使用方法是基础
    4. 子类通过继承并通过实现它的方法管理其状态 { acquire 和 release} 的方法操纵状态
    5. 可以同时实现排他锁和共享锁模式(独占、共享)

    常用类

    CountDownLatch
    Semaphore
    CyclicBarrier
    ReentrantLock
    Condition
    FutureTask

    Exchanger

    Phaser

    Phaser一种可重用的同步屏障,功能上类似于CyclicBarrier和CountDownLatch,但使用上更为灵活。非常适用于在多线程环境下同步协调分阶段计算任务(Fork/Join框架中的子任务之间需同步时,优先使用Phaser)

    Semaphore实现原理与ReentrantLock类似,通过内部类Sync、NonfairSync、FairSync,不过其是对一组资源的限制

    Future:接口,FutureTask是它的实现类,配合线程池来一起工作,将任务交给线程池去处理。

    CountDownLacth

    CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

    CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

    @Self4j
    public class CountDownLatchExample {
    
        private final  static int threadCount = 200;
        
        public static void main(String[] arg) {
        
            ExecutorService exec = Executors.newCachedThreadPool(); 
            
            final CountDownLatch lacth = new CountDownLatch(5);
            
            for (int i = 0; i < 1000; i++) {
                exec.excute( () -> {
                final int threadNum  = i;
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    // latch递减
                    lacth.countDown();
                }
                });
            }
            // 等待latch计数器为0,则继续往下执行
            latch.await();
            // latch的await方法也可以传入等待时间,等到等待时间后不管有没完成计数都往下执行
            // latch.await( 10, TimeUnit.MILLISECONDS);
            log.info("finished");
            exec.shutdown();
        }
    
        public static void test(int i)  throw Exception{
            log.info("thread: {}", i);
        }
    }

    Semaphore

    Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。
    Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。
    void acquire():从此信号量获取一个许可前线程将一直阻塞。
    void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。
    void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
    void release(int n):释放n个许可。
    int availablePermits():获取当前可用的许可数。
    boolean tryAcquire():仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
    boolean tryAcquire(int permits):仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。

    boolean tryAcquire(int permits,
                              long timeout,
                              TimeUnit unit)
                       throws InterruptedException

    如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被 中断,则从此信号量获取给定数目的许可。

    @Self4j
    public class SemaphoreExample {
    
        private final  static int threadCount = 200;
    
        public static void main(String[] arg) {
    
            ExecutorService exec = Executors.newCachedThreadPool(); 
        
            final Semaphore semaphore = new Semaphore(3);
        
            for (int i = 0; i < threadCount; i++) {
                exec.excute( () )-> {
                final int threadNum  = i;
                try {
                    // tryAcquire会尝试去获取一个信号量,如果获取不到
                    // 则什么都不会发生,走接下来的逻辑
                    // if (semaphore.tryAcquire(1)) {
                    //    test(i);
                    //    semaphore.release();//释放一个信号量
                    // }
                    semaphore.acquire();//获取一个信号量
                    test(i);
                    semaphore.release();//释放一个信号量
                } catch (Exception e) {
                    log.error("exception", e);
                } 
                });
            }
            log.info("finished");
            exec.shutdown();
        }
    
        public static void test(int i)  throw Exception{
            log.info("thread: {}", i);
        }
    }

    CyclicBarrier

    一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。

    CyclicBarrier(int parties, Runnable barrierAction)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

    CyclicBarrier(int parties)
    创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

    int await()
    在所有 参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

    int await(long timeout,
                     TimeUnit unit)
              throws InterruptedException,
                     BrokenBarrierException,
                     TimeoutException

    在所有 参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。

    boolean isBroken() : 查询此屏障是否处于损坏状态。

    void reset() :
    将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。注意,在由于其他原因造成损坏 之后,实行重置可能会变得很复杂;此时需要使用其他方式重新同步线程,并选择其中一个线程来执行重置。与为后续使用创建一个新 barrier 相比,这种方法可能更好一些。

    int getNumberWaiting() :返回当前在屏障处等待的参与者数目。此方法主要用于调试和断言。

    @Self4j
    public class CyclicBarrierExample {
    
        private final  static int threadCount = 200;
        
        private final static CyclicBarrier cyclicBarrier = new CyclicBarrier(7, 
            () -> {
            log.info("callback is running !");
            }
        );
        
        public static void main(String[] arg) {
        
            ExecutorService exec = Executors.newCachedThreadPool(); 
            
            for (int i = 0; i < threadCount; i++) {
                exec.excute( () -> {
                    final int threadNum  = i;
                    try {
                        race(i);
                    } catch (Exception e) {
                        log.error("exception", e);
                    } 
                });
            }
            log.info("finished");
            
            exec.shutdown();
        }
        
        public static void race(int i)  throw Exception{
            log.info("thread {} is ready", i);
            cyclicBarrier.await();
            log.info("thread {} is continue", i);
        }
    }

    Exchanger

    Exchanger 用于两个线程间的数据交换,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

    使用场景: 两个线程相互等待处理结果并进行数据传递。

        public void latch() throws InterruptedException, IOException {
            int count = 5;
            Exchanger<String> exchanger = new Exchanger<>();
            ExecutorService executorService = Executors.newFixedThreadPool(count);
                for (int x=0;x<count;x++){
                    executorService.execute(new Worker(x,exchanger));
                }
            System.in.read();
        }
    
        class Worker extends Thread {
            Integer start;
            Exchanger<String>  exchanger;
    
            public Worker(Integer start, Exchanger<String> exchanger) {
                this.start = start;
                this.exchanger = exchanger;
            }
    
            @Override
            public void run() throws IllegalArgumentException {
                try {
                    System.out.println(Thread.currentThread().getName() + " 准备执行");
                    TimeUnit.SECONDS.sleep(start);
                    System.out.println(Thread.currentThread().getName() + " 等待交换");
                    String value = exchanger.exchange(Thread.currentThread().getName());
                    System.out.println(Thread.currentThread().getName() + " 交换得到数据为:"+value);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    
    /*
    pool-1-thread-1 准备执行
    pool-1-thread-1 等待交换
    pool-1-thread-3 准备执行
    pool-1-thread-2 准备执行
    pool-1-thread-5 准备执行
    pool-1-thread-4 准备执行
    pool-1-thread-2 等待交换
    pool-1-thread-1 交换得到数据为:pool-1-thread-2
    pool-1-thread-2 交换得到数据为:pool-1-thread-1
    pool-1-thread-3 等待交换
    pool-1-thread-4 等待交换
    pool-1-thread-4 交换得到数据为:pool-1-thread-3
    pool-1-thread-3 交换得到数据为:pool-1-thread-4
    pool-1-thread-5 等待交换
    */

    Exchanger必须成对出现,否则会像上面代码执行结果那样,pool-1-thread-5一直阻塞等待与其交换数据的线程,为了避免这一现象,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长

     
  • 相关阅读:
    JS函数防抖与函数节流
    AJAX问题 XMLHttpRequest.status = 0是什么含义
    通过JS如何获取IP地址
    关于URL编码
    报错Unexpected token u
    css文本超出2行就隐藏并且显示省略号
    At_speed_test
    Logic Bist Arch
    Logic BIST
    DMA-330(二)
  • 原文地址:https://www.cnblogs.com/Bkxk/p/11577520.html
Copyright © 2011-2022 走看看