zoukankan      html  css  js  c++  java
  • (五)新类库的构件

    CountDownLatch

    public class CountDownLatch
    extends Object

    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

    A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

    A CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown(). A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

    A useful property of a CountDownLatch is that it doesn't require that threads calling countDown wait for the count to reach zero before proceeding, it simply prevents any thread from proceeding past an await until all threads could pass.

    用来同步一个或多个任务,强制它们等待其他任务执行的一组操作完成。可以这么理解,有两组任务A,B。A的多个任务等待B组的所有任务结束才能执行。

    CountDownLatch对象设置一个初始的值。A中的任务执行前先调用CountDownLatch.await()方法将当前任务阻塞,当CountDownLatch的值为0时才能进行下去。B中的每个任务执行完都调用CountDownLatch.countDown()来减小计数值。这样就可以保证B中的任务可以同时进行,当B的任务全部结束,A的任务才可以开始。

    CountDownLatch对象的计数值不能被再次重置,只能使用一次。想要重置,使用CyclicBarrier

    public class CountDownLatchDemo {
        static final int SIZE = 10;
        public static void main(String[] args) {
            ExecutorService exec = Executors.newCachedThreadPool();
            CountDownLatch latch = new CountDownLatch(SIZE);
            
            for(int i=0;i<10;i++){
                exec.execute(new A(latch));
            }
            
            for(int i=0;i<10;i++){
                exec.execute(new B(latch));
            }
            
            System.out.println("Launched all tasks");
            exec.shutdown();
        }
        
    }
    
    class B implements Runnable{
        private static int counter =0;
        private final int id = counter++;
        private static Random random = new Random(47);
        private final CountDownLatch latch;
        public B(CountDownLatch latch){
            this.latch=latch;    
        }
        
        public void run(){
            try{
                doWork();
                latch.countDown();//B中完成一次任务,计数值减1
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    
        private void doWork() throws InterruptedException {
            TimeUnit.MILLISECONDS.sleep(random.nextInt(10));
            System.out.println(this+" completed");
        }
        public String toString(){
            return String.format("B %1$-3d", id);
        }
    }
    
    class A implements Runnable{
        private static int counter =0;
        private final int id = counter++;
        private static Random random = new Random(57);
        private final CountDownLatch latch;
        public A(CountDownLatch latch){
            this.latch=latch;
        }
        @Override
        public void run() {
            try{
                latch.await();//等待计数值为0,在这之前都处于阻塞状态
                TimeUnit.MILLISECONDS.sleep(random.nextInt(1));
                System.out.println("Latch barrier passedd for "+this);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }    
        public String toString(){
            return String.format("A %1$-3d", id);
        }
    }
    
    输出:
    Launched all tasks
    B 3   completed
    B 2   completed
    B 6   completed
    B 5   completed
    B 0   completed
    B 7   completed
    B 1   completed
    B 4   completed
    B 9   completed
    B 8   completed
    Latch barrier passedd for A 0  
    Latch barrier passedd for A 3  
    Latch barrier passedd for A 2  
    Latch barrier passedd for A 1  
    Latch barrier passedd for A 4  
    Latch barrier passedd for A 5  
    Latch barrier passedd for A 6  
    Latch barrier passedd for A 7  
    Latch barrier passedd for A 8  
    Latch barrier passedd for A 9 

    CyclicBarrier

    试想有一个任务,把它分割成多个子任务交给不同的线程去做,等它们都完成后,再执行下一个步骤。这时可以用CyclicBarrier对象。

    初始化CyclicBarrier对象时需要指定任务的数目n,当有n个线程对同一个CyclicBarrier对象的await方法调用并进入阻塞的话,才算达到栅栏点。

    以下程序演示对矩阵的每个元素求平方,我们用5个线程分别处理5行数据。等到5个线程都完成工作后,将处理好的矩阵打印输出。

    public class CyclicBarrierDemo {
        public static void main(String[] args) {
            int[][] matrix={{1,1,1,1,1},{2,2,2,2,2},{3,3,3,3,3},{4,4,4,4,4},{5,5,5,5,5}};
            List<int[]> list = new ArrayList<int[]>();//把矩阵每一行放在list里
            for(int i=0;i<matrix.length;i++){list.add(matrix[i]);}
            ExecutorService exec = Executors.newCachedThreadPool();
            CyclicBarrier barrier = new CyclicBarrier(5,new Runnable(){
                public void run() {
                    System.out.println("Solver are all completed");
                    //打印处理后的矩阵
                    for(int i=0;i<matrix.length;i++){
                        System.out.println(Arrays.toString(list.get(i)));
                    }
                }
            });
            for(int i=0;i<5;i++){
                exec.execute(new Solver(barrier,list.get(i)));
            }
        }
    }
    class Solver implements Runnable{
        private int[] row;
        private static int count=0;
        public Random random = new Random(47);
        private final int id = count++;
        private CyclicBarrier barrier;
        public Solver(CyclicBarrier barrier,int[] row){
            this.barrier=barrier;
            this.row=row;
        }
        public void run(){
            try {
                //任务开始
                int length = row.length;
                for(int i=0;i<length;i++){
                    row[i]=row[i]*row[i];
                }
                TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                System.out.println("Solver "+id+" completed");
                //任务结束,到达栅栏点
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    输出:
    Solver 1 completed
    Solver 2 completed
    Solver 0 completed
    Solver 3 completed
    Solver 4 completed
    Solver are all completed
    [1, 1, 1, 1, 1]
    [4, 4, 4, 4, 4]
    [9, 9, 9, 9, 9]
    [16, 16, 16, 16, 16]
    [25, 25, 25, 25, 25]

    DelayQueue

    这是一个无界的BlockingQueue,里面放置Delayed对象,其中的对象只能在其到期后才能被取走。如果队列中无到期的对象,则等待。该队列是有序队列,队首是最先过期的那个对象。因此该队列内部的对象需要比较各自的过期时间,故对象必须实现Delayed接口,即实现两个方法compareTo()和getDelay()。一个是用于比较对象之间的时间先后,一个用于获取对象的过期时间。

    public class DelayQueueDemo {
        public static void main(String[] args) {
            DelayQueue<DelayedTask> queue = new DelayQueue<DelayedTask>();
            ExecutorService exec = Executors.newCachedThreadPool();
            Random random = new Random(48);
            for(int i=0;i<20;i++)
                queue.put(new DelayedTask(random.nextInt(1000)));//把所有具有延迟到期功能的对象放在DelayQueue对列里
            exec.execute(new DelayedTaskConsumer(queue));
            queue.put(new DelayedTask(500));
        }
    }
    //具有延迟到期功能的任务
    class DelayedTask implements Runnable,Delayed{
        private static int count=0;
        private final int id = count++;
        private final int delta;
        private final long trigger;
        public DelayedTask(int delayMilliseconds){
            delta = delayMilliseconds;
            trigger = System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS);
        }
        @Override
        public int compareTo(Delayed o) {
            DelayedTask that = (DelayedTask) o;
            if(trigger<that.trigger) return -1;
            if(trigger>that.trigger) return 1;
            return 0;
        }
        @Override
        public long getDelay(TimeUnit unit) {
            //返回剩余时间
            return TimeUnit.NANOSECONDS.convert(System.nanoTime()-trigger,TimeUnit.NANOSECONDS);
        }
        @Override
        public void run() {
            System.out.println("DelayedTask delayTime ["+delta+"] "+"is running");
        }    
    }
    class DelayedTaskConsumer implements Runnable{
        private DelayQueue<DelayedTask> queue ;
        public DelayedTaskConsumer(DelayQueue<DelayedTask> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            try{
                while(!Thread.interrupted()){queue.take().run();}//取出最先过期的对象,并操作该对象,这里执行了对象的run方法
            }catch(InterruptedException e){e.printStackTrace();}
        }    
    }
    输出:
    DelayedTask delayTime [100] is running
    DelayedTask delayTime [140] is running
    DelayedTask delayTime [183] is running
    DelayedTask delayTime [244] is running
    DelayedTask delayTime [316] is running
    DelayedTask delayTime [368] is running
    DelayedTask delayTime [522] is running
    DelayedTask delayTime [562] is running
    DelayedTask delayTime [569] is running
    DelayedTask delayTime [703] is running
    DelayedTask delayTime [794] is running
    DelayedTask delayTime [804] is running
    DelayedTask delayTime [831] is running
    DelayedTask delayTime [877] is running
    DelayedTask delayTime [911] is running
    DelayedTask delayTime [926] is running
    DelayedTask delayTime [972] is running
    DelayedTask delayTime [982] is running
    DelayedTask delayTime [984] is running
    DelayedTask delayTime [987] is running

    PriorityBlockingQueue

    优先级队列,它具有可阻塞的读取操作。该队列总是取出优先级最高的对象。优先级的比较由队列内对象的compareTo方法比较,故对象应实现Comparable接口。

    public class PriorityBlockingQueueDemo {
    
        public static void main(String[] args) {
            PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
            ExecutorService exec = Executors.newCachedThreadPool();
            Random random = new Random(48);
            for(int i=0;i<20;i++)
                queue.put(new PriorityTask(random.nextInt(1000)));
            exec.execute(new PriorityTaskConsumer(queue));
        }
    
    }
    class PriorityTask implements Runnable,Comparable{
        private static int count=0;
        private final int id = count++;
        private final int priority;
        public PriorityTask(int priority){
            this.priority=priority;
        }
        @Override
        public int compareTo(Object o) {
            PriorityTask that = (PriorityTask) o;
            if(this.priority<that.priority) return -1;
            if(this.priority>that.priority) return 1;
            return 0;
        }
        @Override
        public void run() {
            System.out.println("PriorityTask priority ["+priority+"] is runnig");
        }
        
    }
    class PriorityTaskConsumer implements Runnable{
        private PriorityBlockingQueue<Runnable> queue;
        Random random = new Random(28);
        public PriorityTaskConsumer(PriorityBlockingQueue<Runnable> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            try{
                while(!Thread.interrupted()){
                    queue.take().run();
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(1000));
                }
            }catch(InterruptedException e){e.printStackTrace();}
        }    
    }
    输出:
    PriorityTask priority [100] is runnig
    PriorityTask priority [140] is runnig
    PriorityTask priority [183] is runnig
    PriorityTask priority [244] is runnig
    PriorityTask priority [316] is runnig
    PriorityTask priority [368] is runnig
    PriorityTask priority [522] is runnig
    PriorityTask priority [562] is runnig
    PriorityTask priority [569] is runnig
    PriorityTask priority [703] is runnig
    PriorityTask priority [794] is runnig
    PriorityTask priority [804] is runnig
    PriorityTask priority [831] is runnig
    PriorityTask priority [877] is runnig
    PriorityTask priority [911] is runnig
    PriorityTask priority [926] is runnig
    PriorityTask priority [972] is runnig
    PriorityTask priority [982] is runnig
    PriorityTask priority [984] is runnig
    PriorityTask priority [987] is runnig

     ScheduledExecutor

    ScheduledThreadPoolExecutor可以用于安排给定延迟后执行或者定期执行的事务。schedule()指定一个任务只运行一次,scheduleAtFixedRate()指定任务定期执行。

    下面模拟一个控制器控制灯光,定期浇水,定期测量温度。

    public class ScheduledThreadPoolExecutorDemo {
        static ScheduledThreadPoolExecutorDemo scheduledDemo = new ScheduledThreadPoolExecutorDemo();
        static ScheduledThreadPoolExecutor schedule = new ScheduledThreadPoolExecutor(10);
        static int start = (int) System.currentTimeMillis();
        static int now;
        public static void main(String[] args) {
            schedule.schedule(scheduledDemo.new Stop(), 5000, TimeUnit.MILLISECONDS);//延迟5000毫秒后,停止所有任务。
            schedule.scheduleAtFixedRate(scheduledDemo.new LightOn(), 0, 500, TimeUnit.MILLISECONDS);
            schedule.scheduleAtFixedRate(scheduledDemo.new LightOff(), 0, 500, TimeUnit.MILLISECONDS);
            schedule.scheduleAtFixedRate(scheduledDemo.new Water(), 0, 1000, TimeUnit.MILLISECONDS);
            schedule.scheduleAtFixedRate(scheduledDemo.new testTemperature(), 0, 100, TimeUnit.MILLISECONDS);
        }
        //获取时间,程序运行的时间
        public synchronized int getTime(){
            now = (int) System.currentTimeMillis();
            return  now-start;
        }
        class LightOn implements Runnable{
            public void run() {
                System.out.println(getTime()+": Light on");
            }
        }
        class LightOff implements Runnable{
            public void run() {
                System.out.println(getTime()+": Light off");
            }
        }
        class Water implements Runnable{
            public void run() {
                System.out.println(getTime()+": Watering");
            }
        }
        class testTemperature implements Runnable{
            public void run() {
                System.out.println(getTime()+": Test temperature");
            }
        }
        class Stop implements Runnable{
            public void run() {
                schedule.shutdownNow();
                System.out.println(getTime()+": Tasks stop");
            }
        }
    }
    输出:
    2: Light on
    4: Light off
    6: Watering
    7: Test temperature
    109: Test temperature
    207: Test temperature
    307: Test temperature
    407: Test temperature
    501: Light on
    503: Light off
    507: Test temperature
    607: Test temperature
    707: Test temperature
    807: Test temperature
    907: Test temperature
    1001: Light on
    1003: Light off
    1006: Watering
    1007: Test temperature
    1107: Test temperature
    1207: Test temperature
    1307: Test temperature
    1407: Test temperature
    1501: Light on
    1503: Light off
    1507: Test temperature
    1607: Test temperature
    1707: Test temperature
    1807: Test temperature
    1916: Test temperature
    2001: Light on
    2003: Light off
    2006: Watering
    2007: Test temperature
    2107: Test temperature
    2207: Test temperature
    2307: Test temperature
    2407: Test temperature
    2501: Light on
    2503: Light off
    2507: Test temperature
    2607: Test temperature
    2707: Test temperature
    2807: Test temperature
    2907: Test temperature
    3001: Light on
    3003: Light off
    3006: Watering
    3007: Test temperature
    3107: Test temperature
    3207: Test temperature
    3307: Test temperature
    3407: Test temperature
    3501: Light on
    3503: Light off
    3507: Test temperature
    3607: Test temperature
    3707: Test temperature
    3807: Test temperature
    3907: Test temperature
    4001: Light on
    4003: Light off
    4006: Watering
    4007: Test temperature
    4107: Test temperature
    4207: Test temperature
    4307: Test temperature
    4407: Test temperature
    4501: Light on
    4503: Light off
    4507: Test temperature
    4607: Test temperature
    4707: Test temperature
    4807: Test temperature
    4907: Test temperature
    5001: Tasks stop

    注:在测试中发现如果一个任务每隔100毫秒执行一次,而该任务执行一次却要120毫秒,则结果是每当该任务执行完毕后会立即执行该任务,相当于间隔120毫秒执行。

    Semaphore(计数信号量

    正常的锁(cocurrent.locks和synchronized)在任何时候只允许一个任务的访问一项资源,而计数信号量运行n个任务同时访问。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。

     

    public class SemaphoreTest {
    
        public static void main(String[] args) {
            // 线程池
            ExecutorService exec = Executors.newCachedThreadPool();
            // 只能5个线程同时访问
            final Semaphore semp = new Semaphore(5);
            // 模拟20个客户端访问
            for (int index = 0; index < 20; index++) {
                final int num = index;
                Runnable run = new Runnable() {
                    public void run() {
                        try {
                            // 获取许可
                            semp.acquire();
                            System.out.println("Accessing: " + num);
                            Thread.sleep((long) (Math.random() * 10000));
                            // 访问完后,释放 ,如果屏蔽下面的语句,则在控制台只能打印5条记录,之后线程一直阻塞
                            semp.release();
                        } catch (InterruptedException e) {
                        }
                    }
                };
                exec.execute(run);
            }
            // 退出线程池
            exec.shutdown();
        }
    }
    输出:
    Accessing: 0
    Accessing: 1
    Accessing: 2
    Accessing: 3
    Accessing: 4
    Accessing: 5
    Accessing: 6
    Accessing: 7
    Accessing: 8
    Accessing: 9
    Accessing: 10
    Accessing: 11
    Accessing: 12
    Accessing: 13
    Accessing: 14
    Accessing: 15
    Accessing: 16
    Accessing: 17
    Accessing: 18
    Accessing: 19

     

    Exchanger

    Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时(即调用Exchanger.exchange()方法),它们各自拥有一个对象(我们想要交换的对象),当他们离开时,这两个任务持有的对象互相交换。

    使用场景:一个任务在创建对象,这些对象的创建代价很昂贵,而另一个任务消费对象。通过这种方式,可以有更多的对象在被创建的同时被消费,示意图如下:

     

    public class ExchangerDemo {
    
        public static void main(String[] args) {
            ExecutorService exec = Executors.newCachedThreadPool();
            Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
            exec.execute(new Producer(exchanger));
            exec.execute(new Consumer(exchanger));
            exec.shutdown();
        }
    
    }
    class Producer implements Runnable{
        Exchanger<List<String>> exchanger;
        List<String> list = new ArrayList<String>();
        public Producer(Exchanger<List<String>> exchanger){
            this.exchanger = exchanger;
        }
        public void run() {
            try {
                for(int i=0;i<5;i++){
                    list.add("Producer");
                }
                list = exchanger.exchange(list);
                
                System.out.println("Producer:从消费者获取的对象:");
                for(String s :list)
                    System.out.println(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    class Consumer implements Runnable{
        Exchanger<List<String>> exchanger;
        List<String> list = new ArrayList<String>();
        public Consumer(Exchanger<List<String>> exchanger){
            this.exchanger = exchanger;
        }
        public void run() {
            try {
                for(int i=0;i<3;i++){
                    list.add("Consumer");
                }
                list = exchanger.exchange(list);
                
                System.out.println("Consumer:从生产者获取的对象:");
                for(String s :list)
                    System.out.println(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
    }
    输出:
    Consumer:从生产者获取的对象:
    Producer
    Producer
    Producer
    Producer
    Producer
    Producer:从消费者获取的对象:
    Consumer
    Consumer
    Consumer
  • 相关阅读:
    Python ES操作
    SVN总结
    MongoDB问题总结
    MySQL
    PyQt小工具
    Python logging模块
    shell脚本
    cmd命令
    eclipse java 项目打包
    Robot Framework:failed: Data source does not exist.错误
  • 原文地址:https://www.cnblogs.com/wuchaodzxx/p/5991125.html
Copyright © 2011-2022 走看看