zoukankan      html  css  js  c++  java
  • java并发系列(二)-----线程之间的协作(wait、notify、join、CountDownLatch、CyclicBarrier、Semaphore)

    在java中,线程之间的切换是由操作系统说了算的,操作系统会给每个线程分配一个时间片,在时间片到期之后,线程让出cpu资源,由其他线程一起抢夺,那么如果开发想自己去在一定程度上(因为没办法100%控制它)让线程之间互相协作、通信,有哪些方式呢?

    wait、notify、notifyAll

    1、void wait( ) 
    导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法
    2、void wait(long timeout) 
    导致当前的线程等待,直到其他线程调用此对象的notify() 方法或 notifyAll() 方法,或者指定的时间过完。
    3、void notify() 
    唤醒在此对象监视器上等待的单个线程
    4、void notifyAll() 
    唤醒在此对象监视器上等待的所有线程

    举例说明:

    public class WaitTest1 {
    
        public static void main(String[] args) {
    
            ThreadA ta = new ThreadA("ta");
    
            synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
                try {
                    System.out.println(Thread.currentThread().getName()+" start ta");
                    ta.start();
    
                    System.out.println(Thread.currentThread().getName()+" block");
                    ta.wait();    // 等待
    
                    System.out.println(Thread.currentThread().getName()+" continue");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        static class ThreadA extends Thread{
    
            public ThreadA(String name) {
                super(name);
            }
    
            public void run() {
                synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”。这个this代表是内部类ThreadA的对象。
                    System.out.println(Thread.currentThread().getName()+" wakup others");
                    notify();    // 唤醒“当前对象上的等待线程”
                }
            }
        }
    }

    运行结果

    main start ta
    main block
    ta wakup others
    main continue 

    注:notify()与notifyAll()区别:notify()会使等待获取某对象锁的一个线程到Runnable状态,但是notifyAll()会使所有的线程到Runnable状态,虽然notifyAll()方法性能开销略大,但是不存在信号丢失问题,因此优先推荐使用notifyAll()。 

    Thread.join()

    1、作用

    当一个线程希望等待另外一个或多个线程运行结束时可以使用。

    2、方法签名

    public final void join()throws InterruptedException  
    //等待多少秒
    public void join(long millis)throwsInterruptedException  
    //看了下源码,也不知道这方法是干啥的。。。
    public final void join(long millis, int nanos)throws InterruptedException

    3、示例

    package com.ty.thread;
    
    public class JoinDemo {
    
        class Boss implements Runnable {
    
            @Override
            public void run() {
                System.out.println("当前线程为:" + Thread.currentThread().getName() + "老板给工人发工资");
            }
            
        }
        
        static class Worker implements Runnable {
    
            @Override
            public void run() {
                System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人准备干活喽*****");
                
                try {
                    //模拟工人干活所花费的时间
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人干活结束*****");
            }
            
        }
        
        public static void main(String[] args) throws InterruptedException {
            Thread worker1 = new Thread(new Worker(), "工人1线程");
            Thread worker2 = new Thread(new Worker(), "工人2线程");
            Thread worker3 = new Thread(new Worker(), "工人3线程");
            //worker与boss一个为static,一个不为static,只是为了展示下不同的写法
            Thread boss = new Thread(new JoinDemo().new Boss(), "boss线程");
            worker1.start();
            worker2.start();
            worker3.start();
            //等待工人干完活,老板才能给钱啊,可恶的资本主义。。。
            worker1.join();
            worker2.join();
            worker3.join();
            boss.start();
        }
    }

    运行结果如下:

    当前线程为:工人1线程工人准备干活喽*****
    当前线程为:工人3线程工人准备干活喽*****
    当前线程为:工人2线程工人准备干活喽*****
    当前线程为:工人1线程工人干活结束*****
    当前线程为:工人2线程工人干活结束*****
    当前线程为:工人3线程工人干活结束*****
    当前线程为:boss线程老板给工人发工资

    反正boss线程肯定是等所有工人线程运行结束之后才开始运行

    Condition

    wait()/notify()存在的问题:
    1、过于底层,并且不好控制
    2、存在过早唤醒的情况
    3、wait(long)无法区分是等待超时还是被通知线程唤醒

    过早唤醒:上图中Notify1线程调用了notifyAll()或notify(),唤醒了wait4线程,但是其实这时候wait4不满足唤醒条件,这种情况就叫做过早唤醒。可以这么理解,你在大街上看到一个美女,然后你大喊一声美女,N个女生回头,就是这个意思。

    因此出现了Condition接口,它作为wait/notify的替代品,解决了过早唤醒的情况,并且解决了wait(long)不能区分其返回是否由等待超时而导致的问题。Condition中的await()、signal()以及signalAll()分别替代wait()、notify()以及notifyAll()。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

    主要方法:

    // 造成当前线程在接到信号或被中断之前一直处于等待状态。
    void await()
    // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    boolean await(long time, TimeUnit unit)
    // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    long awaitNanos(long nanosTimeout)
    // 造成当前线程在接到信号之前一直处于等待状态。
    void awaitUninterruptibly()
    // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
    boolean awaitUntil(Date deadline)
    // 唤醒一个等待线程。
    void signal()
    // 唤醒所有等待线程。
    void signalAll()

    示例:

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionTest1 {
            
        private static Lock lock = new ReentrantLock();
        private static Condition condition = lock.newCondition();
    
        public static void main(String[] args) {
    
            ThreadA ta = new ThreadA("ta");
    
            lock.lock(); // 获取锁
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start();
    
                System.out.println(Thread.currentThread().getName()+" block");
                condition.await();    // 等待
    
                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();    // 释放锁
            }
        }
    
        static class ThreadA extends Thread{
    
            public ThreadA(String name) {
                super(name);
            }
    
            public void run() {
                lock.lock();    // 获取锁
                try {
                    System.out.println(Thread.currentThread().getName()+" wakup others");
                    condition.signal();    // 唤醒“condition所在锁上的其它线程”
                } finally {
                    lock.unlock();    // 释放锁
                }
            }
        }
    }

    运行结果:

    main start ta
    main block
    ta wakup others
    main continue

    那么Condition是如何预防过早唤醒的呢?如下图所示:

    其实本质就是不同的notify线程跟wait线程之间使用不同的condition,与wait/notify相比,粒度更细,因此预防过早唤醒的情况出现。

    并且针对一个锁,可以有多个condition,示例如下:

    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    class BoundedBuffer {
        final Lock lock = new ReentrantLock();
        final Condition notFull  = lock.newCondition(); 
        final Condition notEmpty = lock.newCondition(); 
    
        final Object[] items = new Object[5];
        int putptr, takeptr, count;
    
        public void put(Object x) throws InterruptedException {
            lock.lock();    //获取锁
            try {
                // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
                while (count == items.length)
                    notFull.await();
                // 将x添加到缓冲中
                items[putptr] = x; 
                // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
                if (++putptr == items.length) putptr = 0;
                // 将“缓冲”数量+1
                ++count;
                // 唤醒take线程,因为take线程通过notEmpty.await()等待
                notEmpty.signal();
    
                // 打印写入的数据
                System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
            } finally {
                lock.unlock();    // 释放锁
            }
        }
    
        public Object take() throws InterruptedException {
            lock.lock();    //获取锁
            try {
                // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
                while (count == 0) 
                    notEmpty.await();
                // 将x从缓冲中取出
                Object x = items[takeptr]; 
                // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
                if (++takeptr == items.length) takeptr = 0;
                // 将“缓冲”数量-1
                --count;
                // 唤醒put线程,因为put线程通过notFull.await()等待
                notFull.signal();
    
                // 打印取出的数据
                System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
                return x;
            } finally {
                lock.unlock();    // 释放锁
            }
        } 
    }
    
    public class ConditionTest2 {
        private static BoundedBuffer bb = new BoundedBuffer();
    
        public static void main(String[] args) {
            // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
            // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
            for (int i=0; i<10; i++) {
                new PutThread("p"+i, i).start();
                new TakeThread("t"+i).start();
            }
        }
    
        static class PutThread extends Thread {
            private int num;
            public PutThread(String name, int num) {
                super(name);
                this.num = num;
            }
            public void run() {
                try {
                    Thread.sleep(1);    // 线程休眠1ms
                    bb.put(num);        // 向BoundedBuffer中写入数据
                } catch (InterruptedException e) {
                }
            }
        }
    
        static class TakeThread extends Thread {
            public TakeThread(String name) {
                super(name);
            }
            public void run() {
                try {
                    Thread.sleep(10);                    // 线程休眠1ms
                    Integer num = (Integer)bb.take();    // 从BoundedBuffer中取出数据
                } catch (InterruptedException e) {
                }
            }
        }
    }

    运行结果如下;

    p1 put  1
    p4 put  4
    p5 put  5
    p0 put  0
    p2 put  2
    t0 take 1
    p3 put  3
    t1 take 4
    p6 put  6
    t2 take 5
    p7 put  7
    t3 take 0
    p8 put  8
    t4 take 2
    p9 put  9
    t5 take 3
    t6 take 6
    t7 take 7
    t8 take 8
    t9 take 9

    先描述下这种场景,有一个缓冲区,另外有一个读线程,一个写线程。读线程与写线程互不干扰的工作,但是当写线程开始工作后,通知读线程去读,并且读线程开始读之后,通知写线程工作。这种场景用wait/notify机制根本无法实现,但是可以通过多个condition可以实现。也就是说比如A、B两个线程,A需要唤醒B,使用一个condition,但同时B也需要唤醒A,就得使用另一个condition。

    CountDownLatch

    countDownLatch与thread.join()功能类似,但是join的粒度太大,必须要等整个线程执行结束才能执行后续相关动作,但是countDownLatch可以实现更精细的粒度。

    1、原理

    其实这种方式也就是所谓的闭锁,一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。内部维护一个用于表示未完成的先决操作数量的计数器,通过countDown()方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。

    理解:其实可以这么理解,一家人去看晚上七点钟开始的周杰伦演唱会,但是你们五点就去了,这时候保安会不让进,这个保安就类似CountDownLatch,到了六点半,保安开始放行。

    2、主要方法

    public CountDownLatch(int count); //指定计数的次数,只能被设置1次
    public void countDown();          //调用此方法则计数减1
    public void await() throws InterruptedException   //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
    public Long getCount();           //得到当前的计数
    public boolean await(long timeout, TimeUnit unit) //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时。

    3、示例

    public class CountDownLatchDemo {  
        final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");  
        public static void main(String[] args) throws InterruptedException {  
            CountDownLatch latch=new CountDownLatch(2);//两个工人的协作  
            Worker worker1=new Worker("zhang san", 5000, latch);  
            Worker worker2=new Worker("li si", 8000, latch);  
            worker1.start();//  
            worker2.start();//  
            latch.await();//等待所有工人完成工作  
            System.out.println("all work done at "+sdf.format(new Date()));  
        }  
          
          
        static class Worker extends Thread{  
            String workerName;   
            int workTime;  
            CountDownLatch latch;  
            public Worker(String workerName ,int workTime ,CountDownLatch latch){  
                 this.workerName=workerName;  
                 this.workTime=workTime;  
                 this.latch=latch;  
            }  
            public void run(){  
                System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));  
                doWork();//工作了  
                System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));  
                latch.countDown();//工人完成工作,计数器减一  
      
            }  
              
            private void doWork(){  
                try {  
                    Thread.sleep(workTime);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }

    CyclicBarrier(栅栏)

    1、概念

    字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

    理解:还是一家人去看晚上七点的周杰伦演唱会,现在已经六点四十五,即将开始,但是有位家人要去上厕所,大家就等着他一起才进去,这时候没有保安这个角色,这也是CyclicBarrier与CountDownLatch的本质区别。

    2、构造器

    public CyclicBarrier(int parties, Runnable barrierAction) {
    }
     
    public CyclicBarrier(int parties) {
    }

    3、主要方法

    public int await() throws InterruptedException, BrokenBarrierException { };
    public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

    4、示例

    场景:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情

    public class Test {
        public static void main(String[] args) {
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
            for(int i=0;i<N;i++)
                new Writer(barrier).start();
        }
        static class Writer extends Thread{
            private CyclicBarrier cyclicBarrier;
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
                try {
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
            }
        }
    }

    运行结果:

    线程Thread-0正在写入数据...
    线程Thread-3正在写入数据...
    线程Thread-2正在写入数据...
    线程Thread-1正在写入数据...
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...

    CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。代码如下:

    public class CyclicBarrierTest2 {
    
        static CyclicBarrier c = new CyclicBarrier(2, new A());
    
        public static void main(String[] args) {
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
    
                    }
                    System.out.println(1);
                }
            }).start();
    
            try {
                c.await();
            } catch (Exception e) {
    
            }
            System.out.println(2);
        }
    
        static class A implements Runnable {
    
            @Override
            public void run() {
                System.out.println(3);
            }
    
        }
    
    }

    输出:

    3
    1
    2

    CyclicBarrier和CountDownLatch的区别

    • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
    • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

    Semaphore

    1、概念

    Semaphore也叫信号量,在JDK1.5被引入,用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池,或者对容器施加边界。
    Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。访问资源后,使用release释放许可。
    Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。当初始值为1时,可以用作互斥锁,并具备不可重入的加锁语义。Semaphore将AQS的同步状态用于保存当前可用许可的数量。

    举例说明:

    把它比作是控制流量的红绿灯,比如一条马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入马路,但是如果前一百辆中有五辆车已经离开了马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

    2、api介绍

    a、构造器

    public Semaphore(int permits) {          //参数permits表示许可数目,即同时可以允许多少线程进行访问
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
    }

    b、主要方法

    public void acquire() throws InterruptedException {  }     //获取一个许可
    public void acquire(int permits) throws InterruptedException { }    //获取permits个许可
    public void release() { }          //释放一个许可
    public void release(int permits) { }    //释放permits个许可

    acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
    release()用来释放许可。注意,在释放许可之前,必须先获获得许可。 
    这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

    public boolean tryAcquire() { };    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

    3、代码示例-----实现资源池

    一个固定长度的资源池,当池为空时,请求资源会失败。使用Semaphore可以实现当池为空时,请求会阻塞,非空时解除阻塞。也可以使用Semaphore将任何一种容器变成有界阻塞容器。

    package com.ty.semaphore;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        public static void main(String[] args) {
            // 线程池
            ExecutorService exec = Executors.newCachedThreadPool();
            // 只能5个线程同时访问
            final Semaphore semp = new Semaphore(5);
            // 模拟20个客户端访问
            for (int index = 0; index < 20; index++) {
                final int NO = index;
                Runnable run = new Runnable() {
                    public void run() {
                        try {
                            // 获取许可
                            semp.acquire();
                            System.out.println("Accessing: " + NO);
                            Thread.sleep((long) (Math.random() * 6000));
                            // 访问完后,释放
                            semp.release();
                            // availablePermits()指还剩多少个许可
                            System.out.println("-----------------" + semp.availablePermits());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                exec.execute(run);
            }
            // 退出线程池
            exec.shutdown();
        }
    }

    运行结果:

    Accessing: 1
    Accessing: 2
    Accessing: 3
    Accessing: 0
    Accessing: 4
    -----------------0
    Accessing: 6
    -----------------1
    Accessing: 9
    -----------------1
    Accessing: 5
    -----------------1
    Accessing: 10
    -----------------1
    Accessing: 8
    -----------------1
    Accessing: 7
    -----------------1
    Accessing: 11
    -----------------1
    Accessing: 15
    -----------------1
    Accessing: 12
    -----------------1
    Accessing: 16
    -----------------1
    Accessing: 17
    -----------------1
    Accessing: 13
    -----------------1
    Accessing: 14
    -----------------1
    Accessing: 18
    -----------------1
    Accessing: 19
    -----------------1
    -----------------2
    -----------------3
    -----------------4
    -----------------5

    说明:虽然是起了20个线程,但是每次最多只有5个线程能够访问,其他线程阻塞,需等待其中有些线程执行完才能执行。

  • 相关阅读:
    LeetCode: 18. 4Sum
    LeetCode:15. 3Sum
    Leetcode:1. Two Sum
    tensorflow placeholder
    Tensorflow变量
    13.git的简单使用
    13.Django1.11.6文档
    12.python进程协程异步IO
    12.Flask-Restful
    12.Django思维导图
  • 原文地址:https://www.cnblogs.com/alimayun/p/10909074.html
Copyright © 2011-2022 走看看