zoukankan      html  css  js  c++  java
  • 条件阻塞Condition的应用

    1.认识condition

    Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。

         Condition类能实现synchronized和wait、notify搭配的功能,另外比后者更灵活,Condition可以实现多路通知功能,也就是在一个Lock对象里可以创建多个Condition(即对象监视器)实例,线程对象可以注册在指定的Condition中,从而可以有选择的进行线程通知,在调度线程上更加灵活。而synchronized就相当于整个Lock对象中只有一个单一的Condition对象,所有的线程都注册在这个对象上。线程开始notifyAll时,需要通知所有的WAITING线程,没有选择权,会有相当大的效率问题。

    1、Condition是个接口,基本的方法就是await()和signal()方法。

    2、Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition(),参考下图。 

    3、调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用。

    4、Conditon中的await()对应Object的wait(),Condition中的signal()对应Object的notify(),Condition中的signalAll()对应Object的notifyAll()。

     2.有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

    public class ConditionCommunication {
    
        public static void main(String[] args) {
            Business bussiness = new Business();
    
            new Thread(new Runnable() {// 开启一个子线程
    
                        @Override
                        public void run() {
                            for (int i = 1; i <= 50; i++) {
    
                                bussiness.sub(i);
                            }
    
                        }
                    }).start();
    
            // main方法主线程
            for (int i = 1; i <= 50; i++) {
    
                bussiness.main(i);
            }
        }   
    }
    
    class Business {
    
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition(); //Condition是在具体的lock之上的
    
        private boolean bShouldSub = true;
    
        public void sub(int i) {
            lock.lock();
            try {
                while(!bShouldSub) {
                    try {
                        condition.await(); //用condition来调用await方法
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("sub thread sequence of " + j
                            + ", loop of " + i);
                }
                bShouldSub = false;
                condition.signal(); //用condition来发出唤醒信号,唤醒某一个
            } finally {
                lock.unlock();
            }
        }
    
        public void main(int i) {
            lock.lock();
            try {
                while(bShouldSub) {
                    try {
                        condition.await(); //用condition来调用await方法
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                for (int j = 1; j <= 10; j++) {
                    System.out.println("main thread sequence of " + j
                            + ", loop of " + i);
                }
                bShouldSub = true;
                condition.signal(); //用condition来发出唤醒信号么,唤醒某一个
            } finally {
                lock.unlock();
            }
        }
    }



    3 缓冲区的阻塞队列

      缓冲区即一个数组,我们可以向数组中写入数据,也可以从数组中把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。 

    class Buffer {
    
        final Lock lock = new ReentrantLock(); //定义一个锁
        final Condition notFull = lock.newCondition(); //定义阻塞队列满了的Condition
        final Condition notEmpty = lock.newCondition();//定义阻塞队列空了的Condition
    
        final Object[] items = new Object[10]; //为了下面模拟,设置阻塞队列的大小为10,不要设太大
    
        int putptr, takeptr, count; //数组下标,用来标定位置的
    
        //往队列中存数据
        public void put(Object x) throws InterruptedException {
            lock.lock(); //上锁
            try {
                while (count == items.length) {
                    System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");
                    notFull.await();    //如果队列满了,那么阻塞存数据这个线程,等待被唤醒
                }
                //如果没满,按顺序往数组中存
                items[putptr] = x;
                if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端
                    putptr = 0;
                ++count;    //消息数量
                System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
                notEmpty.signal(); //好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
            } finally {
                lock.unlock(); //放锁
            }
        }
    
        //从队列中取数据
        public Object take() throws InterruptedException {
            lock.lock(); //上锁
            try {
                while (count == 0) {
                    System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");
                    notEmpty.await();  //如果队列是空,那么阻塞取数据这个线程,等待被唤醒
                }
                //如果没空,按顺序从数组中取
                Object x = items[takeptr];
                if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端
                    takeptr = 0;
                --count; //消息数量
                System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
                notFull.signal(); //好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
                return x;
            } finally {
                lock.unlock(); //放锁
            }
        }
    }

    这个程序很经典,我从官方JDK文档中拿出来的,然后加了注释。程序中定义了两个Condition,分别针对两个线程,等待和唤醒分别用不同的Condition来执行,思路很清晰,程序也很健壮。可以考虑一个问题,为啥要用两个Codition呢?之所以这么设计肯定是有原因的,如果用一个Condition,现在假设队列满了,但是有2个线程A和B同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程A,那么A可以存了,存完后,A又唤醒一个线程,如果B被唤醒了,那就出问题了,因为此时队列是满的,B不能存的,B存的话就会覆盖原来还没被取走的值,就因为使用了一个Condition,存和取都用这个Condition来睡眠和唤醒,就乱了套。到这里,就能体会到这个Condition的用武之地了,现在来测试一下上面的阻塞队列的效果:

    4 两个以上线程之间的唤醒

      还是原来那个题目,现在让三个线程来执行,看一下题目:

    有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。

      如过不用Condition,还真不好弄,但是用Condition来做的话,就非常方便了,原理很简单,定义三个Condition,子线程1执行完唤醒子线程2,子线程2执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面看看代码吧,很容易理解。

    public class ThreeConditionCommunication {
    
        public static void main(String[] args) {
            Business bussiness = new Business();
    
            new Thread(new Runnable() {// 开启一个子线程
    
                        @Override
                        public void run() {
                            for (int i = 1; i <= 50; i++) {
    
                                bussiness.sub1(i);
                            }
    
                        }
                    }).start();
    
            new Thread(new Runnable() {// 开启另一个子线程
    
                @Override
                public void run() {
                    for (int i = 1; i <= 50; i++) {
    
                        bussiness.sub2(i);
                    }
    
                }
            }).start();
    
            // main方法主线程
            for (int i = 1; i <= 50; i++) {
    
                bussiness.main(i);
            }
        }
    
        static class Business {
    
            Lock lock = new ReentrantLock();
            Condition condition1 = lock.newCondition(); //Condition是在具体的lock之上的
            Condition condition2 = lock.newCondition();
            Condition conditionMain = lock.newCondition();
    
            private int bShouldSub = 0;
    
            public void sub1(int i) {
                lock.lock();
                try {
                    while (bShouldSub != 0) {
                        try {
                            condition1.await(); //用condition来调用await方法
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 10; j++) {
                        System.out.println("sub1 thread sequence of " + j
                                + ", loop of " + i);
                    }
                    bShouldSub = 1;
                    condition2.signal(); //让线程2执行
                } finally {
                    lock.unlock();
                }
            }
    
            public void sub2(int i) {
                lock.lock();
                try {
                    while (bShouldSub != 1) {
                        try {
                            condition2.await(); //用condition来调用await方法
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 10; j++) {
                        System.out.println("sub2 thread sequence of " + j
                                + ", loop of " + i);
                    }
                    bShouldSub = 2;
                    conditionMain.signal(); //让主线程执行
                } finally {
                    lock.unlock();
                }
            }
    
            public void main(int i) {
                lock.lock();
                try {
                    while (bShouldSub != 2) {
                        try {
                            conditionMain.await(); //用condition来调用await方法
                        } catch (Exception e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    for (int j = 1; j <= 5; j++) {
                        System.out.println("main thread sequence of " + j
                                + ", loop of " + i);
                    }
                    bShouldSub = 0;
                    condition1.signal(); //让线程1执行
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    5. 三个线程按照顺序依次执行

    package com.demo.test;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class ConditionService {
    
        // 通过nextThread控制下一个执行的线程
        private static int nextThread = 1;
        private ReentrantLock lock = new ReentrantLock();
        // 有三个线程,所以注册三个Condition
        Condition conditionA = lock.newCondition();
        Condition conditionB = lock.newCondition();
        Condition conditionC = lock.newCondition();
    
        public void excuteA() {
            try {
                lock.lock();
                while (nextThread != 1) {
                    conditionA.await();
                }
                System.out.println(Thread.currentThread().getName() + " 工作");
                nextThread = 2;
                conditionB.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void excuteB() {
            try {
                lock.lock();
                while (nextThread != 2) {
                    conditionB.await();
                }
                System.out.println(Thread.currentThread().getName() + " 工作");
                nextThread = 3;
                conditionC.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void excuteC() {
            try {
                lock.lock();
                while (nextThread != 3) {
                    conditionC.await();
                }
                System.out.println(Thread.currentThread().getName() + " 工作");
                nextThread = 1;
                conditionA.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
  • 相关阅读:
    PAT A1147 Heaps (30 分)——完全二叉树,层序遍历,后序遍历
    # 数字签名&数字证书
    # Doing homework again(贪心)
    # Tallest Cows(差分)
    # ACM奇淫技巧
    # 二维前缀和+差分
    # 费解的开关(二进制+递推+思维)
    # log对数Hash映射优化
    # 起床困难综合症(二进制枚举+按位求贡献)
    # 最短Hamilton路径(二进制状态压缩)
  • 原文地址:https://www.cnblogs.com/wzj4858/p/8350607.html
Copyright © 2011-2022 走看看