zoukankan      html  css  js  c++  java
  • Java并发之同步工具类

    1. CountDownlatch(计数器)

    描述:

    一个同步工具类,允许一个或多个线程等待其它线程完成操作

    类图

    在这里插入图片描述

    通过指定的count值进行初始化,调用await方法的线程将被阻塞,直到count值通过countDown()方法减小到0,所有等待的线程才会被释放继续执行。另外CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值

    事例:

    package com.lkf.concurrent;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    
    import static java.util.concurrent.Executors.newFixedThreadPool;
    
    
    public class CountDownlatchTest {
        /**
         * 计数器,用来控制线程数量,传入参数2,表示计数器计数为2
         */
        private final static CountDownLatch M_COUNT_DOWN_LATCH = new CountDownLatch(2);
    
        /**
         * 示例工作线程类
         */
        private static class WorkerThreadA implements Runnable {
            private final String mThreadName;
            private final int mSleepTime;
    
            public WorkerThreadA(String name, int sleepTime) {
                mThreadName = name;
                mSleepTime = sleepTime;
            }
    
            @Override
            public void run() {
                System.out.println("[" + mThreadName + "] started!");
                try {
                    Thread.sleep(mSleepTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                M_COUNT_DOWN_LATCH.countDown();
                System.out.println("[" + mThreadName + "] end!");
            }
        }
    
        /**
         * 工作线程类
         */
        private static class WorkerThreadB implements Runnable {
    
            @Override
            public void run() {
                System.out.println("[WorkerThread] started!");
                try {
                    // 阻塞在这里等待 mCountDownLatch 里的count变为0;                
                    M_COUNT_DOWN_LATCH.await();
                } catch (InterruptedException e) {
    
                }
                System.out.println("[WorkerThread] end!");
            }
        }
    
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = newFixedThreadPool(3);
            // 最先run WorkerThread
            executorService.submit(new WorkerThreadB());
            // 运行两个工作线程
            // 工作线程1运行3秒        
            executorService.submit(new WorkerThreadA("WorkingThread1", 3000));
            // 工作线程2运行2秒
            executorService.submit(new WorkerThreadA("WorkingThread2", 2000));
        }
    }
    
    

    2. CyclicBarrier(同步屏障)

    描述

    CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(intparties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞

    类图

    image

    事例

    package com.lkf.concurrent;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest {
        public static void main(String[] args) {
            //初始化四个线程
            int threadNum = 4;
            CyclicBarrier barrier = new CyclicBarrier(threadNum, new WorkerThreadA());
            for (int i = 0; i < threadNum; i++) {
                new WorkerThread(barrier).start();
            }
        }
    
        static class WorkerThread extends Thread {
            private CyclicBarrier cyclicBarrier;
    
            public WorkerThread(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            @Override
            public void run() {
                System.out.println("线程" + Thread.currentThread().getName() + "正在执行");
                try {
                    Thread.sleep(5000);      //以睡眠来模拟操作
                    System.out.println("线程" + Thread.currentThread().getName() + "执行完毕,等待其他线程执行完成");
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("所有线程执行完成,继续处理其他任务...");
            }
        }
    
        static class WorkerThreadA extends Thread {
            @Override
            public void run() {
                System.err.println("我是特殊任务");
            }
        }
    }
    
    

    有一个高级构造函数,当一组线程执行完毕后,优先执行某个方法,CyclicBarrier(int parties, Runnable barrierAction),可以用来处理特殊的任务

    应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如一个1000万行数据的大文件,统计数据最大的钱五个数,假如我们用五个线程,将大文件分成5份,分别计算每一份中最大的数,最后,barrierAction用这些线程的计算结果,计算出整个文件中最大的五个数。

    CyclicBarrier和CountDownLatch的区别

    CountDownLatch的计数器只能使用一次,CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

    3. Semaphore(信号量)

    描述

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,就是控制并发线程的数量

    类图

    image

    应用场景

    Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控

    事例

    public class SemaphoreTest {
    
        private static final int THREAD_COUNT = 30;
    
        private static ExecutorService threadPool = newFixedThreadPool(THREAD_COUNT);
    
        private static Semaphore s = new Semaphore(10);
    
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            s.acquire();
                            System.out.println("save data");
                            s.release();
                        } catch (InterruptedException e) {
                        }
                    }
                });
            }
    
            threadPool.shutdown();
        }
    }
    

    其他方法

    Semaphore还提供一些其他方法:
    
    int availablePermits() :返回此信号量中当前可用的许可证数。
    
    int getQueueLength():返回正在等待获取许可证的线程数。
    
    boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
    
    void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
    
    Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
    

    4. Exchanger(线程间数据交换)

    描述

    Exchanger(交换器)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

    类图

    image

    应用场景

    Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

    Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致

    事例

    public class ExchangerTest {
        //交换器
        private static final Exchanger<String> exgr = new Exchanger<String>();
    
        //线程池
        private static ExecutorService threadPool = newFixedThreadPool(2);
    
        public static void main(String[] args) {
    
            threadPool.execute(() -> {
                try {
                    String threadAData = "线程A的数据";
                    exgr.exchange(threadAData);
                } catch (InterruptedException e) {
                }
            });
    
            threadPool.execute(() -> {
                try {
                    String threadBData = "线程B的数据";
                    String threadAData = exgr.exchange("B");
                    System.out.println("A和B数据是否一致:" + threadAData.equals(threadBData) + ",A录入的是:"
                            + threadAData + ",B录入是:" + threadBData);
                } catch (InterruptedException e) {
                }
            });
    
            threadPool.shutdown();
    
        }
    }
    

    其它方法

    如果两个线程有一个没有到达exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以设置最大等待时长。

    public V exchange(V x, long timeout, TimeUnit unit)
    
    博主文章同步发布:https://blog.csdn.net/u010647035/article/details/83959552
  • 相关阅读:
    算法与数据结构(二):队列
    算法与数据结构(二):链表
    算法与数据结构(一):时间复杂度与空间复杂度
    2018总结与2019规划
    VC++ IPv6的支持
    从项目中学习HTML+CSS
    xampp 中 mysql的相关配置
    yum卸载遇到的问题--待解决
    RHEL 6.5----heartbeat
    RHEL 6.5-----MFS
  • 原文地址:https://www.cnblogs.com/liukaifeng/p/10052671.html
Copyright © 2011-2022 走看看