zoukankan      html  css  js  c++  java
  • 并发06--JAVA中的并发工具类

    1、等待多线程完成的CountDownLatch

    CountDownLatch允许一个或多个线程等待其他线程完成操作。

    使用join也可以完成这个操作,代码示例如下:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
    
    @Slf4j
    public class JoinCountDownLatchTest {
        public static void main(String[] arg) throws Exception{
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    log.info("T1 finish");
                }
            });
    
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    log.info("T2 finish");
                }
            });
    
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            log.info("main finish");
        }
    
    
    }

    输出结果:

    17:31:32.578 [Thread-1] INFO com.example2.demo2.controller.JoinCountDownLatchTest - T2 finish
    17:31:32.578 [Thread-0] INFO com.example2.demo2.controller.JoinCountDownLatchTest - T1 finish
    17:31:32.583 [main] INFO com.example2.demo2.controller.JoinCountDownLatchTest - main finish

    其实,T1和T2的执行顺序是不确定的,但是主线程一定是等T1和T2都执行完毕后再执行的。

    join的原理是不停的检查join线程是否存活(wait(0)),如果存活,就一直等待,如果不存活,就往下执行。

    CountDownLatch的使用如下所示:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    
    
    @Slf4j
    public class CountDownLatchTest {
        public static void main(String[] arg) throws Exception{
            CountDownLatch countDownLatch = new CountDownLatch(2);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    log.info("T1 finish");
                    countDownLatch.countDown();
                }
            }).start();
    
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    log.info("T2 finish");
                    countDownLatch.countDown();
                }
            }).start();
            log.info("main finish before");
            countDownLatch.await();
            log.info("main finish");
        }
    
    
    }

    输出结果:

    17:34:06.799 [main] INFO com.example2.demo2.controller.CountDownLatchTest - main finish before
    17:34:06.799 [Thread-1] INFO com.example2.demo2.controller.CountDownLatchTest - T2 finish
    17:34:06.799 [Thread-0] INFO com.example2.demo2.controller.CountDownLatchTest - T1 finish
    17:34:06.803 [main] INFO com.example2.demo2.controller.CountDownLatchTest - main finish

    可以发现,main finish before的输出在T1和T2之前,其实,这三个的输出顺序是不定的,只是在countDownLatch.await()时,需要两个(初始化时定义为2)线程调用countDown方法才会往后执行.

    他的实现逻辑就是,初始化时,定义一个初始化数值,每次调用countDown方法,会将该值减1,直到减为0,await等待的线程被唤醒。countDown()方法可以在任意的地方调用,不一定是一个线程里面只能调用一次,而是可以在任意地方调用,比如说一个方法的多个步骤内。

    2、同步屏障CycllicBarrier

      CyclicBarrier主要做的内容就是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障后,屏障门才会移开,所有被屏障拦截的线程才会继续往后执行。

    代码示例:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    
    @Slf4j
    public class CyclicBarrierTest {
        public static void main(String[] arg) throws Exception{
            CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("T1 before");
                        cyclicBarrier.await();
                        log.info("T1 end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            cyclicBarrier.await();
            log.info("main");
        }
    
    
    }

    执行结果:

    18:12:28.448 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest - T1 before
    18:12:28.453 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest - T1 end
    18:12:28.453 [main] INFO com.example2.demo2.controller.CyclicBarrierTest - main

    执行结果中T1 end和main的输出顺序不定,但是T1 before一定是最先输出,如果把初始化CyclicBarrier的值变为3,那么主线程和T1线程均会被无限阻塞。

    同时,CyclicBarrier也提供了一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),这个方法在所有线程到达屏障时,优先执行barrierAction方法,用以处理更复杂的业务场景,代码示例:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    
    @Slf4j
    public class CyclicBarrierTest2 {
        public static void main(String[] arg) throws Exception{
            CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new DemoClass());
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("T1 before");
                        cyclicBarrier.await();
                        log.info("T1 end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            cyclicBarrier.await();log.info("main");
        }
    
    
        static class DemoClass implements Runnable{
            @Override
            public void run() {
                log.info("T2");
            }
        }
    
    
    }

    输出结果:

    18:17:17.424 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest2 - T1 before
    18:17:17.427 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest2 - T2
    18:17:17.427 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest2 - T1 end
    18:17:17.427 [main] INFO com.example2.demo2.controller.CyclicBarrierTest2 - main

    上述输出,T1 before和T2的顺序是一定的,T1 before执行时还没有调用await方法,此时并不是所有的线程都到达了屏障,因此该输出先执行,然后待所有线程都到达屏障时,优先执行初始化的leiDemoClass,因此T2第二个输出,最后所有的线程均往后执行,T1 end和main随机顺序输出。

    那么说了这么多,CycylicBarrier的使用场景是什么呢,一般就是需要多个线程分别处理不同的数据,但是在后续需要将各个线程计算的内容做个汇总,以下面的代码为例,开启4个线程,每个线程分别随机得到一个100以内的整数,然后将各个线程的数据进行汇总。

    代码示例:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    
    @Slf4j
    public class BnakWterTest {
        private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4,new BankWaterService());
        private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    
        private static Executor executor = Executors.newFixedThreadPool(4);
    
        public static void main(String[] arg) throws Exception{
            for (int i=0;i<4;i++){
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        int k = (int)(Math.random()*100);
                        log.info("线程{}随机值{}",Thread.currentThread().getName(),k);
                        map.put(Thread.currentThread().getName(),k);
                        try {
                            cyclicBarrier.await();
                            log.info("线程{}执行完毕",Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    
    
        static class BankWaterService implements Runnable{
            @Override
            public void run() {
                int result = 0;
                for (Map.Entry<String, Integer> entry : map.entrySet()){
                    result += entry.getValue();
                }
                log.info("最终结果:{}",result);
            }
        }
    
    }

    输出结果:

    18:23:26.348 [pool-1-thread-4] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-4随机值58
    18:23:26.348 [pool-1-thread-3] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-3随机值21
    18:23:26.348 [pool-1-thread-1] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-1随机值16
    18:23:26.348 [pool-1-thread-2] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-2随机值34
    18:23:26.355 [pool-1-thread-2] INFO com.example2.demo2.controller.BnakWterTest - 最终结果:129
    18:23:26.356 [pool-1-thread-1] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-1执行完毕
    18:23:26.356 [pool-1-thread-3] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-3执行完毕
    18:23:26.356 [pool-1-thread-4] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-4执行完毕
    18:23:26.356 [pool-1-thread-2] INFO com.example2.demo2.controller.BnakWterTest - 线程pool-1-thread-2执行完毕

    那么我们发现CountDownLatch和CyclicBarrier非常类似,那么他们的区别是什么呢,最主要的区别就是CountDownLatch只可以使用一次,而CyclicBarrier可以多次使用,如果计算错误,可以使用reset()方法重置计数器,并且CyclicBarrier还提供了其他的一些方法,例如使用getNumberWaiting方法获取阻塞的线程数;使用isBroken判断阻塞的线程是否被中断等。

    代码示例:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    
    @Slf4j
    public class CyclicBarrierTest3 {
        public static void main(String[] arg){
            CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            Thread thread1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("T1 before");
                        cyclicBarrier.await();
                        log.info("T1 end");
                    } catch (InterruptedException e) {
    
                    } catch (BrokenBarrierException e) {
    
                    }
                }
            });
            thread1.start();
            thread1.interrupt();
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                log.info("isBroken=={}",cyclicBarrier.isBroken());
            }
    
        }
    
    }

    输出结果:

    18:35:32.077 [Thread-0] INFO com.example2.demo2.controller.CyclicBarrierTest3 - T1 before
    18:35:32.084 [main] INFO com.example2.demo2.controller.CyclicBarrierTest3 - isBroken==true

    3、控制并发线程数的Semaphore

    代码示例:

    package com.example2.demo2.controller;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    
    
    @Slf4j
    public class SemaphoreTest {
        private static final int THREAD_COUNT = 10;
        private static Executor executor = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore semaphore = new Semaphore(2);
        public static void main(String[] arg) throws Exception{
           for(int i=0;i<THREAD_COUNT;i++){
               executor.execute(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           semaphore.acquire();
                           log.info("当前线程{}",Thread.currentThread().getName());
                           semaphore.release();
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
    
                   }
               });
           }
        }
    
    
    }

    输出结果:

    18:44:25.673 [pool-1-thread-2] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-2
    18:44:25.673 [pool-1-thread-1] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-1
    18:44:25.679 [pool-1-thread-4] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-4
    18:44:25.679 [pool-1-thread-3] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-3
    18:44:25.679 [pool-1-thread-6] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-6
    18:44:25.679 [pool-1-thread-5] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-5
    18:44:25.679 [pool-1-thread-7] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-7
    18:44:25.679 [pool-1-thread-8] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-8
    18:44:25.679 [pool-1-thread-10] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-10
    18:44:25.679 [pool-1-thread-9] INFO com.example2.demo2.controller.SemaphoreTest - 当前线程pool-1-thread-9

    从结果可以看到,虽然线程池大小为10,但是Semaphore控制只允许两个线程同时执行,结果也可以看到,每一次输出都是成对出现,12,34,56

    semaphore的用法很简单,首先,使用acquire发放一个许可证,使用完毕后,调用release释放许可证。

    同时semaphore还提供了一些其他的方法

    方法 描述
    int availablePermits() 返回此信号量中当前可用的许可证数
    int getQueueLength() 返回正在等待获取许可证的线程数
    boobeal hasQueueThreads() 是否有线程正在等待获取许可证
    void reducePermits(int reduction) 减少reduction个许可证
    Collection getQueueThreads() 返回所有等待获取许可证的线程集合

    4、线程间数据交换的Exchanger

      Exchanger是线程间协作的工具类,用于线程间的数据交换,两个线程通过exchange方法交换数据,一个线程先执行了exchange方法,就会一直等待第二个线程执行该方法,当两个线程到达同步点时,这两个线程就可以做数据交换。

    package com.example2.demo2.controller;
    
    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    
    
    @Slf4j
    public class ExchangerTest {
        private static ExecutorService executor = Executors.newFixedThreadPool(2);
        private static Exchanger<User> exchanger = new Exchanger<>();
        public static void main(String[] arg) throws Exception{
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    User user1 = new User("lcl",18);
                    try {
                        exchanger.exchange(user1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("T1 end");
                }
            });
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    User user2 = new User("mm",15);
                    try {
                        User user1 = exchanger.exchange(user2);
                        log.info("user1【{}】",user1.getAge());
                        log.info("user2【{}】",user2.getAge());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    log.info("T2 end");
                }
            });
            executor.shutdown();
        }
    
        public static class User{
            public volatile String name;
            public volatile int age;
    
            public User(String name, int age){
                this.name = name;
                this.age = age;
            }
    
            public String getName(){
                return this.getName();
            }
    
            public int getAge(){
                return this.age;
            }
    
        }
    
    }

    输出结果:

    19:44:12.289 [pool-1-thread-2] INFO com.example2.demo2.controller.ExchangerTest - user1【1819:44:12.293 [pool-1-thread-2] INFO com.example2.demo2.controller.ExchangerTest - user2【15
  • 相关阅读:
    sqlalchemy访问Oracle数据库报错:UnicodeDecodeError: 'big5' codec can't decode byte 0xfb in position 2: illegal multibyte sequence
    Mac如何安装FastDfs
    Django执行Sql语句笔记
    跑DRF框架分页源码笔记
    Python Paginator分页学习
    Python Excel笔记
    npm run dev报错解决方法
    npm install --global vue-cli 报错 [..................] / rollbackFailedOptional: verb npm-session abfa82f3041ebc02
    MS17_010漏洞攻击Windows7
    虚拟机启动黑屏
  • 原文地址:https://www.cnblogs.com/liconglong/p/13159486.html
Copyright © 2011-2022 走看看