zoukankan      html  css  js  c++  java
  • java一些常用并发工具示例

    最近把《java并发编程实战》-Java Consurrency in Practice 重温了一遍,把书中提到的一些常用工具记录于此:

    一、闭锁(门栓)- CountDownLatch

    适用场景:多线程测试时,通常为了精确计时,要求所有线程都ready后,才开始执行,防止有线程先起跑,造成不公平,类似的,所有线程执行完,整个程序才算运行完成。

        /**
         * 闭锁测试(菩提树下的杨过 http://yjmyzz.cnblogs.com/)
         *
         * @throws InterruptedException
         */
        @Test
        public void countdownLatch() throws InterruptedException {
            CountDownLatch startLatch = new CountDownLatch(1); //类似发令枪
            CountDownLatch endLatch = new CountDownLatch(10);//这里的数量,要与线程数相同
    
            for (int i = 0; i < 10; i++) {
                Thread t = new Thread(() -> {
                    try {
                        startLatch.await(); //先等着,直到发令枪响,防止有线程先run
                        System.out.println(Thread.currentThread().getName() + " is running...");
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        endLatch.countDown(); //每个线程执行完成后,计数
                    }
                });
                t.setName("线程-" + i);
                t.start();
            }
            long start = System.currentTimeMillis();
            startLatch.countDown();//发令枪响,所有线程『开跑』
            endLatch.await();//等所有线程都完成
            long end = System.currentTimeMillis();
            System.out.println("done! exec time => " + (end - start) + " ms");
        }  

    执行结果:

    线程-1 is running...
    线程-5 is running...
    线程-8 is running...
    线程-4 is running...
    线程-3 is running...
    线程-0 is running...
    线程-2 is running...
    线程-9 is running...
    线程-7 is running...
    线程-6 is running...
    done! exec time => 13 ms

    注:大家可以把第14行注释掉,再看看运行结果有什么不同。

    二、信号量(Semaphore)

    适用场景:用于资源数有限制的并发访问场景。

       public class BoundedHashSet<T> {
            private final Set<T> set;
            private final Semaphore semaphore;
    
            public BoundedHashSet(int bound) {
                this.set = Collections.synchronizedSet(new HashSet<T>());
                this.semaphore = new Semaphore(bound);
            }
    
            public boolean add(T t) throws InterruptedException {
                if (!semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
                    return false;
                }
                ;
                boolean added = false;
                try {
                    added = set.add(t);
                    return added;
                } finally {
                    if (!added) {
                        semaphore.release();
                    }
                }
            }
    
            public boolean remove(Object o) {
                boolean removed = set.remove(o);
                if (removed) {
                    semaphore.release();
                }
                return removed;
            }
        }
    
        @Test
        public void semaphoreTest() throws InterruptedException {
    
            BoundedHashSet<String> set = new BoundedHashSet<>(5);
            for (int i = 0; i < 6; i++) {
                if (set.add(i + "")) {
                    System.out.println(i + " added !");
                } else {
                    System.out.println(i + " not add to Set!");
                }
            }
        }
    

    上面的示例将一个普通的Set变成了有界容器。执行结果如下:

    0 added !
    1 added !
    2 added !
    3 added !
    4 added !
    5 not add to Set!

    三、栅栏CyclicBarrier 

    这个跟闭锁类似,可以通过代码设置一个『屏障』点,其它线程到达该点后才能继续,常用于约束其它线程都到达某一状态后,才允许做后面的事情。

        public class Worker extends Thread {
    
            private CyclicBarrier cyclicBarrier;
    
            public Worker(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
    
            private void step1() {
                System.out.println(this.getName() + " step 1 ...");
            }
    
            private void step2() {
                System.out.println(this.getName() + " step 2 ...");
            }
    
            public void run() {
                step1();
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                step2();
            }
        }
    
        @Test
        public void cyclicBarrierTest() throws InterruptedException, BrokenBarrierException {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(11);
            for (int i = 0; i < 10; i++) {
                Worker w = new Worker(cyclicBarrier);
                w.start();
            }
            cyclicBarrier.await();
    
        }
    

    这里我们假设有一个worder线程,里面有2步操作,要求所有线程完成step1后,才能继续step2. 执行结果如下:

    Thread-0 step 1 ...
    Thread-1 step 1 ...
    Thread-2 step 1 ...
    Thread-3 step 1 ...
    Thread-4 step 1 ...
    Thread-5 step 1 ...
    Thread-6 step 1 ...
    Thread-7 step 1 ...
    Thread-8 step 1 ...
    Thread-9 step 1 ...
    Thread-9 step 2 ...
    Thread-0 step 2 ...
    Thread-3 step 2 ...
    Thread-4 step 2 ...
    Thread-6 step 2 ...
    Thread-2 step 2 ...
    Thread-1 step 2 ...
    Thread-8 step 2 ...
    Thread-7 step 2 ...
    Thread-5 step 2 ...

    四、Exchanger

    如果2个线程需要交换数据,Exchanger就能派上用场了,见下面的示例:

        @Test
        public void exchangerTest() {
            Exchanger<String> exchanger = new Exchanger<>();
    
            Thread t1 = new Thread(() -> {
                String temp = "AAAAAA";
                System.out.println("thread 1 交换前:" + temp);
                try {
                    temp = exchanger.exchange(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread 1 交换后:" + temp);
            });
    
            Thread t2 = new Thread(() -> {
                String temp = "BBBBBB";
                System.out.println("thread 2 交换前:" + temp);
                try {
                    temp = exchanger.exchange(temp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thread 2 交换后:" + temp);
            });
    
            t1.start();
            t2.start();
        }
    

     执行结果:

    thread 1 交换前:AAAAAA
    thread 2 交换前:BBBBBB
    thread 2 交换后:AAAAAA
    thread 1 交换后:BBBBBB

    五、FutureTask/Future

    一些很耗时的操作,可以用Future转化成异步,不阻塞后续的处理,直到真正需要返回结果时调用get拿到结果

        @Test
        public void futureTaskTest() throws ExecutionException, InterruptedException, TimeoutException {
    
            Callable<String> callable = () -> {
                System.out.println("很耗时的操作处理中。。。");
                Thread.sleep(5000);
                return "done";
            };
    
            FutureTask<String> futureTask = new FutureTask<>(callable);
    
            System.out.println("就绪。。。");
            new Thread(futureTask).start();
            System.out.println("主线程其它处理。。。");
            System.out.println(futureTask.get());
            System.out.println("处理完成!");
    
            System.out.println("-----------------");
    
            System.out.println("executor 就绪。。。");
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<String> future = executorService.submit(callable);
            System.out.println(future.get(10, TimeUnit.SECONDS));
        }
    

     执行结果:

    就绪。。。
    主线程其它处理。。。
    很耗时的操作处理中。。。
    done
    处理完成!
    -----------------
    executor 就绪。。。
    很耗时的操作处理中。。。
    done

    六、阻塞队列BlockingQueue

    阻塞队列可以在线程间实现生产者-消费者模式。比如下面的示例:线程producer模拟快速生产数据,而线程consumer模拟慢速消费数据,当达到队列的上限时(即:生产者产生的数据,已经放不下了),队列就堵塞住了。

    @Test
        public void blockingQueueTest() throws InterruptedException {
            final BlockingQueue<String> blockingDeque = new ArrayBlockingQueue<>(5);
    
            Thread producer = new Thread() {
                public void run() {
                    Random rnd = new Random();
                    while (true) {
                        try {
                            int i = rnd.nextInt(10000);
                            blockingDeque.put(i + "");
                            System.out.println(this.getName() + " 产生了一个数字:" + i);
                            Thread.sleep(rnd.nextInt(50));//模拟生产者快速生产
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            };
            producer.setName("producer 1");
    
    
            Thread consumer = new Thread() {
                public void run() {
                    while (true) {
                        Random rnd = new Random();
                        try {
    
                            String i = blockingDeque.take();
                            System.out.println(this.getName() + " 消费了一个数字:" + i);
                            Thread.sleep(rnd.nextInt(10000));//消费者模拟慢速消费
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            };
            consumer.setName("consumer 1");
    
            producer.start();
            consumer.start();
    
            while (true) {
                Thread.sleep(100);
            }
        }

    执行结果:

    producer 1 产生了一个数字:6773
    consumer 1 消费了一个数字:6773
    producer 1 产生了一个数字:4456
    producer 1 产生了一个数字:8572
    producer 1 产生了一个数字:5764
    producer 1 产生了一个数字:2874
    producer 1 产生了一个数字:780 # 注意这里就已经堵住了,直到有消费者消费一条数据,才能继续生产
    consumer 1 消费了一个数字:4456
    producer 1 产生了一个数字:4193

  • 相关阅读:
    【04】Vue 之 事件处理
    【03】Vue 之列表渲染及条件渲染
    【02】 Vue 之 数据绑定
    传递参数的模式最适合向函数传入大量可先参数的情形
    ie6 PNG图片透明
    实现表单的输入框当光标为当前时,去掉默认值
    SSIS ->> Environment Variables
    SQL Server ->> FileTable
    SQL Server ->> 间接实现COUNT(DISTINCT XXX) OVER(PARTITION BY YYY)
    SQL Server ->> EXECUTE AS LOGIN/USER和Revert表达式
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/java-concurrent-tools-sample.html
Copyright © 2011-2022 走看看