zoukankan      html  css  js  c++  java
  • 并发编程(三):并发模拟

      我们要如何模拟一个高并发的环境来检查我们的代码呢?一般常用的方式有三种:利用测试工具ab,利用jmeter,代码模拟。本篇博客着重要说的是代码模拟的实现方式。

      在开始写代码之前我们首先了解一下J.U.C中特别重要的两个工具类:CountDownLatch、Semaphore

      CountDownLatch(闭锁)

      CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。CountDownLatch是通过一个计数器来实现的,当计数器的值为0时,在闭锁上等待的线程就可以恢复执行任务

        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }

      从此类的构造方法可以看出,构造此类需要传入count,这个值只能被设置一次,而且这个计数值不能被重置

      countDown()方法:每调用一次这个方法,构造函数中的count值减1

      await()方法:调用此方法的线程会在这个方法上阻塞,直到count值为0才继续执行自己的任务

     CountDownLatch-demo1

    @Slf4j
    public class CountDownLatchExample1 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception",e);
                    }finally {
                        countDownLatch.countDown();
                    }
    
                });
            }
    
            countDownLatch.await();
            log.info("finish");
            //线程池不再使用后关闭
            exec.shutdown();
    
        }
    
        private static void test(int threadNum)throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
            Thread.sleep(100);
        }
    }

    输出如下:

      此demo1证明了count为0后发生阻塞的线程(调用await()处)才会继续执行

      CountDownLatch-demo2

    @Slf4j
    public class CountDownLatchExample2 {
    
        private final static int threadCount = 200;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
    
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        test(threadNum);
                    } catch (Exception e) {
                        log.error("exception",e);
                    }finally {
                        countDownLatch.countDown();
                    }
    
                });
            }
            //统计的时间为test方法执行的时间 ,如下所示为只等待10毫秒就继续执行
            countDownLatch.await(10, TimeUnit.MILLISECONDS);
            log.info("finish");
            //线程池不再使用后关闭,不是直接销毁,会让当前已有的线程执行完然后销毁
            exec.shutdown();
    
        }
    
        private static void test(int threadNum)throws Exception {
            Thread.sleep(100);
            log.info("{}", threadNum);
    //        Thread.sleep(100);
        }
    }

    输出如下:

      demo2表明await()方法支持设置阻塞时间,如果等待时间查过设置的阻塞时间,则发生阻塞的线程继续向下执行(可参考await()的重载)

      Semaphore(信号量)

      Semaphore管理一系列许可证,Semaphore的acquire请求许可证,获得许可证可继续执行,否则发生阻塞,Semaphore的release方法释放许可证。下面我们来看一下Semaphore的构造方法

    public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
    public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }

      从构造方法可以看出,两个构造方法都必须提供许可证的数量(可理解为同时并发执行的线程数),不同的是第二个构造方法还需要指定是公平模式还是非公平模式,默认为非公平模式。公平模式是指调用acquire的顺序就是获取许可证的顺序,遵循FIFO;非公平模式是抢占式的,可能一个新的获取线程恰好在许可证释放的时候得到了许可证,而它前面还有在等待许可证的线程

    Semaphore-demo1

    @Slf4j
    public class SemaphoreExample1 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        semaphore.acquire(); //获取一个许可
                        test(threadNum);
                        semaphore.release();  //释放一个许可
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                });
            }
    
            //线程池不再使用后关闭
            exec.shutdown();
    
        }
    
        private static void test(int threadNum)throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }

    输出如下:

    参考demo1的输出可发现同一秒内,只有三个线程在并发执行(信号量为3每次只获取一个许可)

    Semaphore-demo2

    @Slf4j
    public class SemaphoreExample2 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        //并发数为3,并且一下拿走三个许可,已经没有别的许可可以放出来了,所以一个个执行
                        semaphore.acquire(3); //获取多个许可
                        test(threadNum);
                        semaphore.release(3);  //释放多个许可
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                });
            }
    
            //线程池不再使用后关闭
            exec.shutdown();
    
        }
    
        private static void test(int threadNum)throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }

    输出如下:

      参考demo2输出可发现,同一秒内只有一个线程在执行(信号量为3,每次获取三个许可),可参考acquire()方法的重载

    Semaphore-demo3

    @Slf4j
    public class SemaphoreExample3 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        //20个请求在同一时间内都会去尝试获取许可,只有三个线程获得了许可
                        if (semaphore.tryAcquire()) {
                            test(threadNum);
                            semaphore.release();  //释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                });
            }
    
            //线程池不再使用后关闭
            exec.shutdown();
    
        }
    
        private static void test(int threadNum)throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }

    输出如下:

      注意上图为demo3完整输出,本次代码中使用的是tryAcquire()而不是acquire()方法,tryAcquire()表示线程会尝试获取许可,如果无法获取许可则此请求被丢弃,信号量为3,每次获取一个许可,所以只有三个线程被执行。

     Semaphore-demo4

    @Slf4j
    public class SemaphoreExample4 {
    
        private final static int threadCount = 20;
    
        public static void main(String[] args) throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(3);
            for (int i = 0; i < threadCount; i++) {
                final int threadNum = i;
                exec.execute(()->{
                    try {
                        //只等待5s,没有执行完的线程则丢弃
                        if (semaphore.tryAcquire(5000,TimeUnit.MILLISECONDS)) {
                            test(threadNum);
                            semaphore.release();  //释放一个许可
                        }
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                });
            }
    
            //线程池不再使用后关闭
            exec.shutdown();
        }
        private static void test(int threadNum)throws Exception {
            log.info("{}", threadNum);
            Thread.sleep(1000);
        }
    }

    输出如下:

      参考demo4输出可发现,20个线程只执行了13个,由代码可看出,在5秒内得到许可的线程得到执行(信号量为3,每次获取一个许可),没有得到许可的线程被丢弃(可参考tryAcquire的重载)

      高并发-demo

    @Slf4j
    public class CountExample1 {
    
        //请求总数
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static int count = 0;
    
        private  static void add() {
            count++;
        }
    
        public static void main(String[] args)throws Exception {
    
            //定义线程池
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                executorService.execute(()->{
                    try {
                        semaphore.acquire();
                        add();
                        semaphore.release();
                    } catch (Exception e) {
                        log.error("exception",e);
                    }
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count);
    
        }
    }

    输出如下:

     有个以上基础,我们可知此demo模拟了5000个请求,并发数为200的情况,根据输出结果可知发生了线程安全问题。

  • 相关阅读:
    88. Merge Sorted Array【leetcode】算法,java将两个有序数组合并到一个数组中
    70. Climbing Stairs【leetcode】递归,动态规划,java,算法
    136. Single Number【LeetCode】异或运算符,算法,java
    605. Can Place Flowers种花问题【leetcode】
    175. Combine Two Tables【LeetCode】-LEFT JON 和RIGHT JOIN,两张表关联查询-java -sql入门
    67. Add Binary【LeetCode】
    4.1 yaml配置注入
    2.1容器功能-组件添加
    1.2自动配置
    json乱码问题全配置
  • 原文地址:https://www.cnblogs.com/sbrn/p/8987408.html
Copyright © 2011-2022 走看看