zoukankan      html  css  js  c++  java
  • 多线程之美3一Java并发工具类

    目录结构

    一、简介
          1.1、Semaphore
          1.2、CountDownLatch
          1.3、CyclicBarrier 
    二、信号量Semaphore
          2.1、构造方法
          2.2、主要方法
          2.3、示例代码
    三、计数器CountDownLatch
          3.1、主要方法
          3.2、示例代码1
          3.3、示例代码2
    四、栅栏类CyclicBarrier
          4.1、构造方法
          4.2、示例代码
    

    一、简介

    1.1、 Semaphore

    信号量,见文知义,常用于并发控制中的限流作用,我理解是限定数量的共享锁机制。该共享资源最多同时可让n个线程访问,超过n个线程就阻塞等待,如有资源空闲, 唤醒其他等待线程(唤醒又分公平与非公平,默认非公平)比如一条四车道大桥,每次仅能并发通过4辆汽车,而在高峰期时100辆车涌入,这次需要一个信号灯来限制车辆,每次最多放行4辆车,在车辆通过后再放行。在并发环境下,每辆车就是如一个线程,4车道大桥就如有限的资源,需要控制线程的数量,在这种业务场景下,靠锁同步的机制(如synchronized)力有不逮,java并发包中提供Semaphore类可以帮助解决此类场景。

    应用场景: 
    1、资源控制:控制数据库连接数,如有多个IO操作,每个需要操作数据库写入,数据库连接数有限,控制连接数据库数量。
    例:100个线程执行IO,只有10个mysql连接,最多同时可以有10个线程获取到连接,否则会报错无法获取连接,这时可用信号量控制。
    
    2、可当同步锁使用,设置信号量通道等于1。
    

    1.2、CountDownLatch

    允许一个或多个线程等待其他线程完成操作后再执行。其内部维护一个计数器,设置初始值给state,每调用 countDown()方法一次,state数量减1,调用await()方法的线程被阻塞,需要等待state减少为0时才可被唤醒继续执行。

    应用场景: 
    1、一个任务要统计公司一星期的财务流水总额,每次需要读取5张Excel表统计流水汇总,如何快速地统计出来?
    可以使用CountDownLatch,先开始5个线程并发地分别统计每张表的流水额度总和,当5个线程统计结束,再汇总总额。
    
    2、开发对外接口,要求响应快,而该接口内部逻辑复杂,涉及多个服务的调用,并依赖这些独立服务响应结果进行下一步操作,这时可以考虑CountDownLatch或者 CyclicBarrier,并发调用多个服务,获取这些结果后才进行下一步,缩短处理时间。
    

    1.3、CyclicBarrier

    循环屏障,栅栏类; CyclicBarrier 可让多个线程相互等待,当所有线程都达到后再唤醒所有线程继续执行。如导游设定目标点,所有游客到这集合,先到的游客等待其他游客,当所有的游客都到了后,大家再一起出发。

    CyclicBarrier与CountDownLatch区别:
    1)  CyclicBarrier 在应用场景中,多个线程之间相互等待,线程之间在业务上可能会更有依赖性 ; CountDownLatch是每个执行 coutDown方法的线程之间可以没有依赖性,而是执行await方法的线程更依赖这些执行coutDown的线程。
    
    2)CountDownLatch计数器只能使用一次,Semaphore可以多次使用,可以重置使用。
    
    3)CyclicBarrier 可以处理更复杂的业务场景,如线程都达到屏障后,可以在构造函数中 CyclicBarrier(int parties, Runnable barrierAction) 传入线程barrierAction,当达到屏障触发条件时,可以比其他等待线程优先执行,处理业务。
    
    4)CyclicBarrier还有很多方法,如查看当前到达屏障被阻塞的线程数量 getNumberWaiting()。
    
    5)CountDownLatch 是每个线程执行完减1操作,当计数器为0时,才唤醒等待线程,阻塞线程只有1个或者多个;CyclicBarrier是让所有线程到达屏障处就被阻塞了,当所有线程都到达时,唤醒所有被阻塞线程继续执行,阻塞线程有多个,线程之间是相互等待。
    

    二、信号量Semaphore

    2.1、构造方法

    Semaphore(int permits) //默认非公平
    //是否公平尝试获取许可证  
    Semaphore(int permits, boolean fair)
    

    2.2、主要方法

    acquire() //从信号量那去获取一个许可,如果没有剩余的就被阻塞
    acquire(int permits)//也可一次获取多个许可
    release() //释放一个许可,将其返还给信号量,给其他线程使用。 
    release(int permits) //一次释放多个许可
    

    2.3、示例代码

    package Semaphore;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    /**
     * @author zdd
     * 2019/11/27 6:43
     * Description: 信号量测试
     */
    public class SemaphoreService {
        private  static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        private Semaphore semaphore = new Semaphore(4);
    
        public void doSomeThing() throws InterruptedException {
            //1,获取一个许可
            semaphore.acquire();
    
            System.out.println(Thread.currentThread().getName() + "--start--" +getFormatDate());
            // 停顿1s
            TimeUnit.SECONDS.sleep(1);
            System.out.println(Thread.currentThread().getName() + "--end--" +getFormatDate());
            //2,用完释放一个许可,可供其他线程使用
            semaphore.release();
        }
        public static void main(String[] args) {
            SemaphoreService semaphoreService  = new SemaphoreService();
            for (int i = 0; i < 10; i++) {
                WorkThread workThread = new WorkThread(i+"",semaphoreService);
                workThread.start();
            }
        }
    
        public String getFormatDate() {
          return sdf.format(new Date());
        }
       // 工作线程类 - 静态内部类
       static  class WorkThread extends Thread {
    
            SemaphoreService semaphoreService;
           //构造参数传入线程名称及SemaphoreService
            public WorkThread(String name, SemaphoreService semaphoreService) {
                this.semaphoreService = semaphoreService;
                setName(name);
            }
            @Override
            public void run() {
                try {
                    semaphoreService.doSomeThing();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    1.线程最多4个同时执行

    //1,设置信号量数为4,执行结果如下图
    private Semaphore semaphore = new Semaphore(4);
    

    2.线程依次执行,实现同步

    //1,设置信号量数为1,执行结果如下图
    private Semaphore semaphore = new Semaphore(1);
    

    三、计数器CountDownLatch

    3.1、主要方法

    countDown() 计数器减1
    await() 阻塞等待,直到计数器为0唤醒继续执行
    await(long timeout, TimeUnit unit) 阻塞等待,在等待设定时间计时器还没到减为0,也不会再继续等待了。
    

    3.2、示例代码1

    package countDownLatch;
    
    import javax.swing.plaf.IconUIResource;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author zdd
     * 2019/11/29 7:19 下午
     * Description: 班长召集士兵集合拉练案例
     */
    public class CountDownLatchSoldierTest {
    
        private static final Integer THREAD_COUNT = 10;
        //等待士兵10人
        private static  CountDownLatch countDownLatch = new CountDownLatch(10);
    
        public static void main(String[] args) throws InterruptedException {
            //1,开10个线程模拟士兵签到,采用线程池创建
            ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
            for (int i = 0; i < 10; i++) {
                //2,提交10个线程执行
                executorService.submit(()->{
                    try{
                        System.out.println(Thread.currentThread().getName()+ ": 士兵报告");
                    }finally {
                        //3,计数器减1
                        countDownLatch.countDown();
                    }
                });
            }
            //3,主线程做班长,负责等待所有士兵到齐,开始拉练
            countDownLatch.await();
            System.out.println("班长: 集合完毕,开始5公里越野! ");
    
            //4,关闭线程池
            executorService.shutdown();
        }
    }
    

    执行结果如下:

    3.3、示例代码2

    计算器未减到0,主线程持续等待。

    package countDownLatch;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * zdd
     * 2019/11/5 10:50 
     * Description:测试计数器使用
     */
    public class CountDownLatchTest {
    
        static  int number = 2;
        //1,设置计数器的值为 2
        static CountDownLatch countDownLatch = new CountDownLatch(number);
    
    
        public static void  main(String[] args) throws InterruptedException {
            //2,开启一个线程,传入计数器
            new Thread(()-> {
                 try {
                    //睡2s,模拟执行业务
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    System.out.println("调用countDown前, 当前count 值为:"+countDownLatch.getCount());
                    //3,调用递减 1次
                    countDownLatch.countDown();
                    System.out.println("调用countDown后, 当前count 值为:"+countDownLatch.getCount());
                }
            },"worker").start();
    
            //情况1、主线程开始执行 ,等待1s之后,如果还没到条件,也不等了哦
           // countDownLatch.await(1,TimeUnit.SECONDS);
           //情况2、阻塞等待,如果计数未到0,一直阻塞等待
            countDownLatch.await();
            System.out.println("主线程继续执行");
        }
    }
    

    主线程阻塞等待,执行结果如下:

    四、栅栏类CyclicBarrier

    4.1、构造方法

    CyclicBarrier(int parties) //parties为需要等待线程的数量
    //barrierAction,在所有其他线程到达后,优先执行的线程。可根据业务添加
    CyclicBarrier(int parties, Runnable barrierAction) 
    

    4.2、示例代码

    package CyclicBarrier;
    
    import java.util.Map;
    import java.util.concurrent.*;
    /**
     * @author zdd
     * 2019/11/28 9:20
     * Description: 开启4个线程分别统计sheet表,使用栅栏类实现同步统计计算,最后计算总和;此处也可用  
     * CountDownLatch实现
     */
    public class BankAccountService {
        private ConcurrentHashMap<String,Integer> concurrentHashMap = new ConcurrentHashMap<>();
      
        // 线程数
        private final static int THREAD_COUNT = 4;
        private CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                Integer sumcount = 0;
                //汇总每个sheet计算结果
                for (Map.Entry<String, Integer> entry: concurrentHashMap.entrySet()) {
                    sumcount+=entry.getValue();
                }
                System.out.println("优先执行,  求和计算完毕,总和为:"+ sumcount);
            }
        });
    
        public void  count() {
            ExecutorService executor = Executors.newFixedThreadPool(4);
    
            for (int i = 0; i < THREAD_COUNT; i++) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName()+"到达屏障!");
                        concurrentHashMap.put(Thread.currentThread().getName(),1);
    
                        try {
                            cyclicBarrier.await();
                            System.out.println(Thread.currentThread().getName()+"继续执行!");
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            executor.shutdown();
        }
    
        public static void main(String[] args) {
            BankAccountService bankAccountService = new BankAccountService();
            bankAccountService.count();
        }
    }
    

    运行结果如下:

    // 将线程数改为2,阻塞等待其他线程
     private final static int THREAD_COUNT = 2;
    


    参考资料:
    Java并发编程的艺术-方腾飞

  • 相关阅读:
    获取当前时区时间
    python lambda表达式详解
    Odoo 12开发之开发环境准备
    初步了解odoo12
    web前端面试题
    实现一个优先级队列
    面试题
    python读取和生成excel文件
    Django基础
    virtualenv
  • 原文地址:https://www.cnblogs.com/flydashpig/p/11960721.html
Copyright © 2011-2022 走看看