zoukankan      html  css  js  c++  java
  • 多线程-CountDownLatch,CyclicBarrier,Semaphore,Exchanger,Phaser

    CountDownLatch 
    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch。调用countDown()计数减一,当计数到达零之前await()方法会一直阻塞,计数无法被重置。

    public class CountDownLatch {
        private final Sync sync;
        public CountDownLatch(int count);
        public void countDown() {
            sync.releaseShared(1);
        }
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    }
    

    CountDownLatch中主要有countDown()和await()方法。 
    countDown()递减计数,如果计数达到零,则是否所有等待的线程。 
    1. 如果当前计数大于零,则计数减一; 
    2. 如果减一之后计数为零,则重新调度所有等待该计数为零的线程; 
    3. 如果计数已经为零,则不发生任何操作; 
    await()使当前线程在计数为零之前一直阻塞,除非线程被中断或超出指定的等待时间; 
    如果计数为零,则立刻返回true 
    在进入此方法时,当前线程已经设置了中断状态或在等待时被中断,则抛出InterruptedException异常,并且清除当前线程的中断状态。如果超出了指定等待时间,则返回false,如果该时间小于等于零,则此方法根本不会等待。

    package org.github.lujiango;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Test16 {
    
        public static void main(String[] args) throws InterruptedException {
            final CountDownLatch begin = new CountDownLatch(1);
            final CountDownLatch end = new CountDownLatch(10);
            final ExecutorService exec = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 10; i++) {
                final int no = i + 1;
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            begin.await();
                            TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                            System.out.println("No." + no + " arrived");
                        } catch (Exception e) {
    
                        } finally {
                            end.countDown();
                        }
                    }
                };
                exec.submit(run);
            }
    
            System.out.println("Game start");
            begin.countDown();
            end.await();
            System.out.println("Game over");
            exec.shutdown();
        }
    
    }
    

    CyclicBarrier 
    一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时的互相等待。

    package org.github.lujiango;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class Test16 {
    
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
            final CyclicBarrier end = new CyclicBarrier(10);
            final ExecutorService exec = Executors.newFixedThreadPool(10);
            System.out.println("Game start");
            for (int i = 0; i < 10; i++) {
                final int no = i + 1;
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            end.await();
                            TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                            System.out.println("No." + no + " arrived");
                        } catch (Exception e) {
    
                        } finally {
                        }
                    }
                };
                exec.submit(run);
            }
            System.out.println("Game over");
            exec.shutdown();
    
        }
    
    }
    

    需要所有的子任务都完成时,才执行主任务,这个时候可以选择使用CyclicBarrier。

    Semaphore 
    一个计数信号量,信号量维护了一个许可集,在许可可用之前会阻塞每一个acquire(),然后获取该许可。每个release()释放许可,从而可能释放一个正在阻塞的获取者。 
    Semaphore只对可用许可的号码进行计数,并采取相应的行动,拿到信号的线程可以进入代码,否则就等待。

    package org.github.lujiango;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class Test17 {
    
        public static void main(String[] args) {
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semp = new Semaphore(5);
            for (int i = 0; i < 20; i++) {
                final int no = i;
                Runnable run = new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            semp.acquire();
                            System.out.println("Accessing: " + no);
                            TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 10000));
                        } catch (Exception e) {
    
                        } finally {
                            semp.release();
                        }
                    }
                };
                exec.submit(run);
            }
            exec.shutdown();
        }
    
    }
    

    Exchanger 
    Exchanger可以在两个线程之间交换数据,只能在两个线程,不支持更多的线程之间互换数据。 
    当线程A调用Exchanger对象的exchage()方法后,会阻塞;直到B线程也调用exchange()方法,然后线程以安全的方式交换数据,之后A和B线程继续执行。

    package org.github.lujiango;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.Exchanger;
    
    public class Test18 {
    
        public static void main(String[] args) {
            Exchanger<List<Integer>> ex = new Exchanger<List<Integer>>();
            new A(ex).start();
            new B(ex).start();
        }
    
    }
    
    class A extends Thread {
        List<Integer> list = new ArrayList<Integer>();
        Exchanger<List<Integer>> ex;
    
        public A(Exchanger<List<Integer>> ex) {
            this.ex = ex;
        }
    
        @Override
        public void run() {
            Random random = new Random();
            for (int i = 0; i < 10; i++) {
                list.clear();
                list.add(random.nextInt(10));
                list.add(random.nextInt(10));
                list.add(random.nextInt(10));
                try {
                    list = ex.exchange(list);
                } catch (Exception e) {
    
                }
            }
        }
    }
    
    class B extends Thread {
        List<Integer> list = new ArrayList<Integer>();
        Exchanger<List<Integer>> ex;
    
        public B(Exchanger<List<Integer>> ex) {
            this.ex = ex;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    list = ex.exchange(list);
                } catch (Exception e) {
    
                }
                System.out.println(list);
            }
        }
    }
    

    Phaser 
    Phaser是一个灵活的线程同步工具,它包含了CountDownLatch和CyclicBarrier的相关功能。 
    CountDownLatch的countDown()和await()可以通过Phaser的arrive()和awaitAdvance(int n)代替 
    而CyclicBarrier的await可以使用Phaser的arriveAndAwaitAdvance()方法代替 
    用Phaser代替CountDownLatch:

    package org.github.lujiango;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    
    public class Test19 {
    
        public static void main(String[] args) throws InterruptedException {
            final Phaser latch = new Phaser(10);
            for (int i = 1; i <= 10; i++) {
                final int id = i;
                Thread t = new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                            System.out.println("thread: " + id + " is running");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            latch.arrive();
                        }
                    }
                });
                t.start();
            }
            latch.awaitAdvance(latch.getPhase());
            System.out.println("all thread has run");
    
        }
    
    }
    
    package org.github.lujiango;
    
    import java.util.concurrent.Phaser;
    import java.util.concurrent.TimeUnit;
    
    public class Test19 {
    
        public static void main(String[] args) throws InterruptedException {
            final Phaser latch = new Phaser(10);
            for (int i = 1; i <= 10; i++) {
                final int id = i;
                Thread t = new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep((long) (Math.random() * 10));
                            latch.arriveAndAwaitAdvance(); // 所有线程都执行到这里,才会继续执行,否则全部阻塞
                            System.out.println("thread: " + id + " is running");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } finally {
                            latch.arrive();
                        }
                    }
                });
                t.start();
            }
        }
    
    }
    

      

  • 相关阅读:
    SQL学习笔记9——SQL中检索数据之分页查询
    SQL学习笔记8——SQL中检索数据之子查询
    SQL学习笔记7——SQL中检索数据之连接查询
    Two Pointer
    LeetCode 1438. Longest Continuous Subarray With Absolute Diff Less Than or Equal to Limit
    leetcode 30 days challenge Check If a String Is a Valid Sequence from Root to Leaves Path in a Binary Tree
    LeetCode First Unique Number
    letcode1143 Longest Common Subsequence
    Leetcode 560 Subarry Sum Equals K
    leetcode Leftmost Column with at Least a One
  • 原文地址:https://www.cnblogs.com/lujiango/p/7581039.html
Copyright © 2011-2022 走看看