zoukankan      html  css  js  c++  java
  • Java并发编程之java.util.concurrent包下常见类的使用

    一,Condition

    一个场景,两个线程数数,同时启动两个线程,线程A数1、2、3,然后线程B数4、5、6,最后线程A数7、8、9,程序结束,这涉及到线程之间的通信。

    public class ConditionTest {
        static class NumberWrapper {
            public int value = 1;
        }
    
        public static void main(String[] args) {
            //初始化可重入锁
            final Lock lock = new ReentrantLock();
            
            //第一个条件当屏幕上输出到3 
            final Condition reachThreeCondition = lock.newCondition();
            //第二个条件当屏幕上输出到6
            final Condition reachSixCondition = lock.newCondition();
            
            //NumberWrapper只是为了封装一个数字,一边可以将数字对象共享,并可以设置为final
            //注意这里不要用Integer, Integer 是不可变对象
            final NumberWrapper num = new NumberWrapper();
            //初始化A线程
            Thread threadA = new Thread(new Runnable() {
                @Override
                public void run() {
                    //需要先获得锁
                    lock.lock();
                    System.out.println("ThreadA获得lock");
                    try {
                        System.out.println("threadA start write");
                        //A线程先输出前3个数
                        while (num.value <= 3) {
                            System.out.println(num.value);
                            num.value++;
                        }
                        //输出到3时要signal,告诉B线程可以开始了
                        reachThreeCondition.signal();
                    } finally {
                        lock.unlock();
                        System.out.println("ThreadA释放lock");
                    }
                    lock.lock();
                    try {
                        //等待输出6的条件
                        System.out.println("ThreadA获得lock");
                        reachSixCondition.await();
                        System.out.println("threadA start write");
                        //输出剩余数字
                        while (num.value <= 9) {
                            System.out.println(num.value);
                            num.value++;
                        }
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                        System.out.println("ThreadA释放lock");
                    }
                }
    
            });
            Thread threadB = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        lock.lock();
                        System.out.println("ThreadB获得lock");
                        Thread.sleep(5000);//是await方法释放了锁
                        while (num.value <= 3) {
                            //等待3输出完毕的信号
                            reachThreeCondition.await();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                        System.out.println("ThreadB释放lock");
                    }
                    try {
                        lock.lock();
                        System.out.println("ThreadB获得lock");
                        //已经收到信号,开始输出4,5,6
                        System.out.println("threadB start write");
                        while (num.value <= 6) {
                            System.out.println(num.value);
                            num.value++;
                        }
                        //4,5,6输出完毕,告诉A线程6输出完了
                        reachSixCondition.signal();
                    } finally {
                        lock.unlock();
                        System.out.println("ThreadB释放lock");
                    }
                }
            });
            //启动两个线程
            threadB.start();
            threadA.start();
        }
    }
    View Code

    创建方式:通过Lock创建,Lock.newCondition();

    常用方法:

    await():阻塞,直到相同的Condition调用了signal方法。
    signal():通知。

    总结:Condition必须与Lock一起使用(wait()、notify()必须与synchronized一起使用,否则运行会报错java.lang.IllegalMonitorStateException),相比于wait与notify更加的灵活,可以设置各种情形,如上例中的到达3和到达6两个条件。

    执行结果:

    二,CountDownLatch

    看代码:

    public class CountDownLatchTest {
        public static void main(String[] args) {
            final CountDownLatch c = new CountDownLatch(3);//总数3
            Thread t1 = new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        System.out.println("开始等");
                        c.await();//阻塞,等待countDown,当countDown到0就执行后面的完事了
                        System.out.println("完事");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
            });
            Thread t2 = new Thread(new Runnable(){
                @Override
                public void run() {
                    for(int i=3;i>0;i--){
                        c.countDown();//减1
                    }
                }
                
            });
            t1.start();
            t2.start();
        }
    }
    View Code

    创建方式:直接创建,new CountDownLatch(int num);

    常用方法:

    await():阻塞,直到countDown方法被执行了num次。
    countDown():减

    总结:适用于一个线程等待其他线程的情景。

    执行结果:

    三,CyclicBarrier

    与CountDownLatch有什么区别?

    CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着。正如上例,只有5个人全部跑到终点,大家才能开喝,否则只能全等着。

    CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。

    另外:

    1.CountDownLatch减计数,CyclicBarrier加计数。 
    2.CountDownLatch是一次性的,CyclicBarrier可以重用。 

    public class MainMission {
        private CyclicBarrier barrier;
        private final static int threadCounts = 5;
        public void runMission() {
            ExecutorService exec=Executors.newFixedThreadPool(threadCounts);
            //new 的时候要传入数字,我发现,这个类似semaphore,如果位置不足,线程会抢位置。数字要是threadCounts+1为主线程留一个位子,但实际测试中发现,只要等于threadCount就可以
             barrier=new CyclicBarrier(threadCounts+1); 
            for(int i=0;i<5;i++){
                exec.execute(new Mission(barrier));
            }
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("所有任务都执行完了");
            exec.shutdown();//如果不关闭,程序一直处于运行状态
        }
        public static void main(String[] args) {
            MainMission m = new MainMission();
            m.runMission();
        }
    }
    class Mission implements Runnable{
        private CyclicBarrier barrier;
        public Mission(CyclicBarrier barrier){
            this.barrier = barrier;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"开始执行任务");
            try {
                int sleepSecond = new Random().nextInt(10)*1000;
                System.out.println(Thread.currentThread().getName()+"要执行"+sleepSecond+"秒任务");
                Thread.sleep(sleepSecond);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"执行完毕");
        }
    }
    View Code

    创建方式:直接创建,new CyclicBarrier(int num);

    常用方法:

    await():阻塞,直到阻塞的线程数量达到num个。

    总结:想想一下百米跑,所有运动员都就位之后才会发令起跑,线程调用await意味着说,我准备好了。

    执行结果:

    ,Semaphore

    下面是一个上厕所的例子,厕所位置有限,想用得排队了。实现使用的就是信号量,可以看出信号量可以用来做限流。

    public class MySemaphore implements Runnable{
        Semaphore position;    
        private int id;
        public MySemaphore(int i,Semaphore s){
            this.id=i;
            this.position=s;
        }
    
        @Override
        public void run() {
            try{
                  if(position.availablePermits()>0){
                   System.out.println("顾客["+this.id+"]进入厕所,有空位");
                  }
                  else{
                   System.out.println("顾客["+this.id+"]进入厕所,没空位,排队");
                  }
                  position.acquire();//只有在acquire之后才能真正的获得了position
                  System.out.println("#########顾客["+this.id+"]获得坑位");
                  Thread.sleep((int)(Math.random()*100000));
                  System.out.println("@@@@@@@@@顾客["+this.id+"]使用完毕");
                  position.release();
             }catch(Exception e){
                 e.printStackTrace();
             }
        }
        
        public static void main(String args[]){
            ExecutorService list=Executors.newCachedThreadPool();
            Semaphore position=new Semaphore(2);
            for(int i=0;i<10;i++){
             list.submit(new MySemaphore(i+1,position));
            }
            list.shutdown();
            position.acquireUninterruptibly(2);
            System.out.println("使用完毕,需要清扫了");
            position.release(2);
        }
    
    
    }
    View Code

    创建方式:直接创建,new Semaphore(int num);

    常用方法:

    availablePermits():看现在可用的信号量。

    acquire():尝试获取一个位置,如果获取不到则阻塞。

    release():释放位置。

    acquireUninterruptibly(int num):尝试获取num个许可,如果没有足够的许可则阻塞,一直阻塞到有足够的许可释放出来。调用这个方法的线程具有优先获取许可的权利。如果调用线程被interrupted,该线程并不会被打断,它会继续阻塞等待许可。

    总结:抢位置。

    执行结果:

    ,ReentrantLock

    创建方式:

    new ReentrantLock(); 此种创建方式会创建出一个非公平锁。

    new ReentrantLock(true); 此种方式会创建出一个公平锁。

    非公平锁:当锁处于无线程占有的状态,此时其他线程和在队列中等待的线程都可以抢占该锁。 
    公平锁:当锁处于无线程占有的状态,在其他线程抢占该锁的时候,都需要先进入队列中等待。

    tryLock()方法:尝试去获取锁,如果没有获取到直接返回,不等待。

    细节看这个吧,https://blog.csdn.net/jiangjiajian2008/article/details/52226189,写的挺好。

    六,ReentrantReadWriteLock

    创建方式:new ReentrantReadWriteLock();

    常用方法:

    readLock().lock();写锁

    writeLock().lock();读锁

    readLock().unlock();解锁

    writeLock().unlock();解锁

    总结:

     * 如果目前是读锁,其他读锁也可以进请求,写锁不能进。
     * 如果目前是写锁,那么其他所有的锁都不可以进。

     * 适用于读多写少的情况,如果是写多读少用ReentrantLock。

    七,Callable接口

    *Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞主线程直到获取结果;当不调用此方法时,主线程不会阻塞!

    与Runnable对比:

    1.Callable可以有返回值,Runnable没有

    2.Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛;

    八,线程池

    提供的线程池有几种:

    //有数量限制的线程池
    ExecutorService service=Executors.newFixedThreadPool(4);
    //没有数量限制的线程池
    ExecutorService service=Executors.newCachedThreadPool();
    //单线程池
    ExecutorService service=Executors.newSingleThreadExecutor();
    他们都是通过下面这个线程池实现的
    有数量线程池的实现方式

    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads/*核心线程数*/, nThreads/*最高线程数*/,
                                          0L/*高出核心线程数的线程最高存活时间*/, TimeUnit.MILLISECONDS/*高出核心线程数的线程最高存活时间单位*/,
                                          new LinkedBlockingQueue<Runnable>()/*任务队列*/);

    }
  • 相关阅读:
    记长连接压测总结
    PHP装扩展
    LMbench安装&使用
    Scala学习笔记-2-(if、while、for、try、match)
    Gatling学习笔记-Scenario(场景)
    Java之路---Day05
    Java之路---Day04
    Java之路---Day03
    Java之路---Day02
    Java之路---Day01
  • 原文地址:https://www.cnblogs.com/miketwais/p/java_util_concurrent.html
Copyright © 2011-2022 走看看