zoukankan      html  css  js  c++  java
  • java一些常用并发工具示例

    最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:

    一、闭锁(门栓)- CountDownLatch

    适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。

        /**
         * 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/)
         *
         * @throws InterruptedException
         */
        @Test
        public void countdownLatch() throws InterruptedException {
            CountDownLatch startLatch = new CountDownLatch(1); //类似发令枪
            CountDownLatch endLatch = new CountDownLatch(10);//这里的数量,要与线程数相同
    
            for (int i = 0; i < 10; i++) {
                Thread t = new Thread(() -> {
                    try {
                        startLatch.await(); //先等着,直到发令枪响,防止有线程先run
                        System.out.println(Thread.currentThread().getName() + " is running...");
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        endLatch.countDown(); //每个线程执行完成后,计数
                    }
                });
                t.setName("线程-" + i);
                t.start();
            }
            long start = System.currentTimeMillis();
            startLatch.countDown();//发令枪响,所有线程『开跑』
            endLatch.await();//等所有线程都完成
            long end = System.currentTimeMillis();
            System.out.println("done! exec time => " + (end - start) + " ms");
        }  

    执行结果:

    线程-1 is running...
    线程-5 is running...
    线程-8 is running...
    线程-4 is running...
    线程-3 is running...
    线程-0 is running...
    线程-2 is running...
    线程-9 is running...
    线程-7 is running...
    线程-6 is running...
    done! exec time => 13 ms

    注:大家可以把第14行注释掉,再看看运行结果有什么不同。

    二、信号量(Semaphore)

    适用场景:用于资源数有限制的并发访问场景。

       public class BoundedHashSet<T> {
            private final Set<T> set;
            private final Semaphore semaphore;
    
            public BoundedHashSet(int bound) {
                this.set = Collections.synchronizedSet(new HashSet<T>());
                this.semaphore = new Semaphore(bound);
            }
    
            public boolean add(T t) throws InterruptedException {
                if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
                    return false;
                }
                ;
                boolean added = false;
                try {
                    added = set.add(t);
                    return added;
                } finally {
                    if (!added) {
                        semaphore.release();
                    }
                }
            }
    
            public boolean remove(Object o) {
                boolean removed = set.remove(o);
                if (removed) {
                    semaphore.release();
                }
                return removed;
            }
        }
    
        @Test
        public void semaphoreTest() throws InterruptedException {
    
            BoundedHashSet<String> set = new BoundedHashSet<>(5);
            for (int i = 0; i < 6; i++) {
                if (set.add(i + "")) {
                    System.out.println(i + " added !");
                } else {
                    System.out.println(i + " not add to Set!");
                }
            }
        }
    

    上面的示例将一个普通的Set变成了有界容器。执行结果如下:

    0 added !
    1 added !
    2 added !
    3 added !
    4 added !
    5 not add to Set!

    三、栅栏CyclicBarrier 

    这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。

        public class Worker extends Thread {
    
            private CyclicBarrier cyclicBarrier;
    
            public Worker(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            private void step1() {
                System.out.println(this.getName() + " step 1 ...");
            }
    
            private void step2() {
                System.out.println(this.getName() + " step 2 ...");
            }
    
            public void run() {
                step1();
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                step2();
            }
        }
    
        @Test
        public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; i++) {
                Worker w = new Worker(cyclicBarrier);
                w.start();
            }
            cyclicBarrier.await();
    
        }
    

    这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2. 执行结果如下:

    Thread-0 step 1 ...
    Thread-1 step 1 ...
    Thread-2 step 1 ...
    Thread-3 step 1 ...
    Thread-4 step 1 ...
    Thread-5 step 1 ...
    Thread-6 step 1 ...
    Thread-7 step 1 ...
    Thread-8 step 1 ...
    Thread-9 step 1 ...
    Thread-9 step 2 ...
    Thread-0 step 2 ...
    Thread-3 step 2 ...
    Thread-4 step 2 ...
    Thread-6 step 2 ...
    Thread-2 step 2 ...
    Thread-1 step 2 ...
    Thread-8 step 2 ...
    Thread-7 step 2 ...
    Thread-5 step 2 ...

    四、Exchanger

    如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:

        @Test
        public void exchangerTest() {
            Exchanger<String> exchanger = new Exchanger<>();
    
            Thread t1 = new Thread(() -> {
                String temp = "AAAAAA";
                System.out.println("thread 1 交换前:" + temp);
                try {
                    temp = exchanger.exchange(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread 1 交换后:" + temp);
            });
    
            Thread t2 = new Thread(() -> {
                String temp = "BBBBBB";
                System.out.println("thread 2 交换前:" + temp);
                try {
                    temp = exchanger.exchange(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread 2 交换后:" + temp);
            });
    
            t1.start();
            t2.start();
        }
    

     执行结果:

    thread 1 交换前:AAAAAA
    thread 2 交换前:BBBBBB
    thread 2 交换后:AAAAAA
    thread 1 交换后:BBBBBB

    五、FutureTask/Future

    一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果

        @Test
        public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException {
    
            Callable<String> callable = () -> {
                System.out.println("很耗时的操作处理中。。。");
                Thread.sleep(5000);
                return "done";
            };
    
            FutureTask<String> futureTask = new FutureTask<>(callable);
    
            System.out.println("就绪。。。");
            new Thread(futureTask).start();
            System.out.println("主线程其它处理。。。");
            System.out.println(futureTask.get());
            System.out.println("处理完成!");
    
            System.out.println("-----------------");
    
            System.out.println("executor 就绪。。。");
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<String> future = executorService.submit(callable);
            System.out.println(future.get(10, TimeUnit.SECONDS));
        }
    

     执行结果:

    就绪。。。
    主线程其它处理。。。
    很耗时的操作处理中。。。
    done
    处理完成!
    -----------------
    executor 就绪。。。
    很耗时的操作处理中。。。
    done

    六、阻塞队列BlockingQueue

    阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。

    @Test
        public void blockingQueueTest() throws InterruptedException {
            final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5);
    
            Thread producer = new Thread() {
                public void run() {
                    Random rnd = new Random();
                    while (true) {
                        try {
                            int i = rnd.nextInt(10000);
                            blockingDeque.put(i + "");
                            System.out.println(this.getName() + " 产生了一个数字:" + i);
                            Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            };
            producer.setName("producer 1");
    
    
            Thread consumer = new Thread() {
                public void run() {
                    while (true) {
                        Random rnd = new Random();
                        try {
    
                            String i = blockingDeque.take();
                            System.out.println(this.getName() + " 消费了一个数字:" + i);
                            Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            };
            consumer.setName("consumer 1");
    
            producer.start();
            consumer.start();
    
            while (true) {
                Thread.sleep(100);
            }
        }

    执行结果:

    producer 1 产生了一个数字:6773
    consumer 1 消费了一个数字:6773
    producer 1 产生了一个数字:4456
    producer 1 产生了一个数字:8572
    producer 1 产生了一个数字:5764
    producer 1 产生了一个数字:2874
    producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
    consumer 1 消费了一个数字:4456
    producer 1 产生了一个数字:4193

  • 相关阅读:
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    centos 编码问题 编码转换 cd到对应目录 执行 中文解压
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    Android MVP 十分钟入门!
    mysql备份及恢复
    mysql备份及恢复
    mysql备份及恢复
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/java-concurrent-tools-sample.html
Copyright © 2011-2022 走看看