zoukankan      html  css  js  c++  java
  • 04.Java多线程并发库API使用3

    1.java5的Semaphere同步工具

    Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

    Semaphore实现的功能就类似银行有6个窗口,12个人有业务要操作,那么同时只能有6个人占用窗口,当有的人业务操作完毕之后,让开位置,其它等待的人群中,有一人可以占用当前窗口,操作自己的业务。

    另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。

    单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

    package com.chunjiangchao.thread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    /**
     * 同步信号量的使用
     * @author chunjaingchao
     * 随时可以调整Semaphore中可并发的数量
     */
    public class SemaphoreDemo {
    
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(1);//允许的并发数
            for(int i=0;i<10;i++){
                threadPool.execute(new Runnable(){
    
                    @Override
                    public void run() {
                        try {
                            semaphore.acquire();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName()+"正在执行,当前已经有"+(1-semaphore.availablePermits())+"个并发");
                        try {
                            Thread.sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        semaphore.release();
                        
                    }
                    
                });
            }
        }
    
    }

    2.CyclicBarrier同步工具

    表示大家彼此等待,大家集合好后才开始出发,分散活动后又在指定地点集合碰面,这就好比整个公司的人员利用周末时间集体郊游一样,先各自从家出发到公司集合后,再同时出发到公园游玩,在指定地点集合后再同时开始就餐,…。

    一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

    package com.chunjiangchao.thread;
    
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * CyclicBarrier的使用
     * @author chunjiangchao
     *
     */
    public class CyclicBarrierDemo {
    
        public static void main(String[] args) {
            final int threadNum = 5;
            ExecutorService threadPool = Executors.newCachedThreadPool();
            final CyclicBarrier cb = new CyclicBarrier(threadNum);
            for(int i=0;i<threadNum;i++){
                threadPool.execute(new Runnable(){
    
                    @Override
                    public void run() {
                        try {
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "运行到Barrier1,已有"+ (cb.getNumberWaiting() + 1)+ "个已经到达,"
                                    + (cb.getNumberWaiting() == threadNum-1 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            cb.await();//障碍点1:当前线程在await这个障碍地点停顿,等着其它线程运行到这
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "运行到Barrier2,已有"+ (cb.getNumberWaiting() + 1)+ "个已经到达,"
                                    + (cb.getNumberWaiting() == threadNum-1 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            cb.await();//障碍点2:
                            Thread.sleep((long) (Math.random() * 10000));
                            System.out.println("线程"
                                    + Thread.currentThread().getName()
                                    + "运行到Barrier3,已有"+ (cb.getNumberWaiting() + 1)+ "个已经到达,"
                                    + (cb.getNumberWaiting() == threadNum-1 ? "都到齐了,继续走啊"
                                            : "正在等候"));
                            cb.await();//障碍点3:
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    
                    }
                    
                });
            }
            //正在执行的任务接着执行,后续不允许添加任务
            threadPool.shutdown();
        }
    
    }

    3.java5的CountDownLatch同步工具

    一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

    犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。

    可以实现一个人(也可以是多个人)等待其他所有人都来通知他,这犹如一个计划需要多个领导都签字后才能继续向下实施。还可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑。用这个功能做百米赛跑的游戏程序不错哦!

    package com.chunjiangchao.thread;
    
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 创建这个CountDownLatch对象的时候,会传入计数器个数,当前线程调用await方法进行等待其它线程的操作,
     * 当其他线程操作计数器的值直到0的时候,才会继续执行后续操作
     * @author chuangjiangchao
     *
     */
    public class CountDownLatchDemo {
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(3);
            for(int i=0;i<3;i++){
                new WorkThread(countDownLatch).start();
            }
            System.out.println("老大在这儿等着");
            countDownLatch.await();
            System.out.println("你们都跑完了,该我走人了");
            
        }
        private static class WorkThread extends Thread{
            private CountDownLatch countDownLatch;
            public WorkThread(CountDownLatch countDownLatch){
                this.countDownLatch = countDownLatch;
            }
            @Override
            public void run() {
                System.out.println("执行耗时的操作");
                try {
                    Thread.sleep(100*new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
                System.out.println(Thread.currentThread().getName()+"跑完任务,计数器值改变");
            }
        }
    
    }

    4.java5的Exchanger同步工具

    用于实现两个线程之间的数据交换,每个线程在完成一定的事务后想与对方交换数据,第一个先拿出数据的线程将一直等待第二个线程拿着数据到来时,才能彼此交换数据。

    谁先到达,谁就等待另外一个线程到达,然后开始交换数据。最后执行各自的动作。

    package com.chunjiangchao.thread;
    
    import java.util.Date;
    import java.util.Random;
    import java.util.concurrent.Exchanger;
    
    /**
     * 两个线程之间的数据交换
     * @author chunjiangchao
     *
     */
    public class ExchangerDemo {
    
        public static void main(String[] args) {
            Exchanger<String> exchanger = new Exchanger<String>();//通过这玩意儿来交换数据
    //        ExecutorService threadPool = Executors.newCachedThreadPool();
    //        threadPool.execute(command);
            new ThreadA(exchanger).start();
            new ThreadB(exchanger).start();
        }
        private static class ThreadA extends Thread{
            private Exchanger<String> exchanger;
            public ThreadA(Exchanger<String> exchanger){
                this.exchanger = exchanger;
            }
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"执行之前"+new Date().toLocaleString());
                    Thread.sleep(new Random().nextInt(100)*100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(Thread.currentThread().getName()+"开始交换数据"+new Date().toLocaleString());
                    String exchange = exchanger.exchange("A的数据");//给我等着,直到需要交换数据的线程B到来
                    System.out.println(Thread.currentThread().getName()+"交换后得到的数据:"+exchange);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }
        }
        private static class ThreadB extends Thread{
            private Exchanger<String> exchanger;
            public ThreadB(Exchanger<String> exchanger){
                this.exchanger = exchanger;
            }
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"执行之前"+new Date().toLocaleString());
                    Thread.sleep(new Random().nextInt(100)*100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    System.out.println(Thread.currentThread().getName()+"开始交换数据"+new Date().toLocaleString());
                    String exchange = exchanger.exchange("B的数据");
                    System.out.println(Thread.currentThread().getName()+"交换后得到的数据:"+exchange);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
            }
        }
    
    }

    5.java5阻塞队列的应用(ArrayBlockingQueue)

    阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,另一方释放数据,Semaphore通常则是由同一方设置和释放信号量(主要控制访问资源的线程数)。

    ArrayBlockingQueue :只有put方法和take方法才具有阻塞功能。

    package com.chunjiangchao.thread;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * 两个生产者,一个消费者
     * @author chunjiangchao
     *
     */
    public class ArrayBlockingQueueDemo {
    
        public static void main(String[] args) {
            final ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
            for(int i=0;i<2;i++){
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        while(true){
                            try {
                                Thread.sleep(3000);
                                int nextInt = new Random().nextInt(100);
                                System.out.println(Thread.currentThread().getName()+"添加数据为:"+nextInt);
                                queue.put(nextInt);//当前操作与take操作是阻塞的
                                System.out.println("当前数据个数"+queue.size());
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
    
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    while(true){
                        try {
                            Thread.sleep(1000);
                            System.out.println(Thread.currentThread().getName()+"获取数据");
                            Integer take = queue.take();//取数据,如果queue里面没有数据,就会一直等,等queue里面存放数据了,才会执行后续的代码
                            System.out.println("获取到的数据为"+take+" ,当前数据个数"+queue.size());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
        
        }
    
    }

    问题:用两个具有1个空间的队列来实现同步通知的功能

    package com.chunjiangchao.thread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * 使用BlockingQueue实现同步通知的功能
     * @author zzb
     *
     */
    public class BlockingQueueCommunicationDemo {
    
        public static void main(String[] args) {
            final Business business = new Business();
            Thread threadMain = new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true){
                        try {
                            business.main();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                
            });
            threadMain.setName("threadMain");
            threadMain.start();
            Thread threadSub = new Thread(new Runnable(){
                
                @Override
                public void run() {
                    while(true){
                        try {
                            business.sub();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                
            });
            threadSub.setName("threadSub");
            threadSub.start();
            /*
             * 你一下我一下,你一下,我一下
                 threadMain执行耗时操作
                threadSub执行耗时操作
                threadMain执行耗时操作
                threadSub执行耗时操作
                threadMain执行耗时操作
                threadSub执行耗时操作
     
             */
        }
        private static class Business{
            private static ArrayBlockingQueue<Integer> mainQueue = new ArrayBlockingQueue<>(1);
            private static ArrayBlockingQueue<Integer> subQueue = new ArrayBlockingQueue<>(1);
            static{
                try {
                    mainQueue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            public void sub() throws InterruptedException{
                subQueue.take();//sub在subQueue取出数据,取不到数据,就等着
                System.out.println(Thread.currentThread().getName()+"执行耗时操作");
                Thread.sleep(3000);
                mainQueue.put(1);//存放数据到subQueue,方便
            }
            public void main() throws InterruptedException{
                mainQueue.take();//main在mainQueue取出数据,取不到数据,就等着
                System.out.println(Thread.currentThread().getName()+"执行耗时操作");
                Thread.sleep(3000);
                subQueue.put(1);//存放数据到subQueue,方便
            }
        }
    
    }

    未完待续……

  • 相关阅读:
    【Java线程】Java线程池ExecutorService
    MappedByteBuffer高速缓存文件、RandomAccessFile随机访问
    RandomAccessFile和memory-mapped files
    花1K内存实现高效I/O的RandomAccessFile类
    家庭局域网的组建(2台或2台以上)
    设置IE浏览器代理上网
    局域网Internet的共享
    三层设备---路由器
    二层设备---网桥和交换机
    底层设备---中继器和集线器
  • 原文地址:https://www.cnblogs.com/chun-jiang-chao-de-gu-shi/p/5409251.html
Copyright © 2011-2022 走看看