zoukankan      html  css  js  c++  java
  • 13.6 线程通信

    一、传统的线程通信——synchronized同步的线程

    假设系统中有两个线程,这两个线程分别代表存款者和取钱者——先假设有一种特殊的要求,系统要求存款者和取款者不断存钱、取钱的动作,而且要求存款者将钱存入指定账户后,取钱者就立即取出钱。不允许存款者两次存钱,也不允许取款者两次取钱。
    为了实现这种功能,可以借助于Object类提供的wait()、notify()和notifyAll()三个方法,这三个方法并不属于Thread类,而是属于Object类。但这三个方法必须同步监视器对象调用,这可以分为以下两种情况:
    1、对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法中直接调用这三个方法。
    2、对于synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法
    关于这三个方法的解释如下:
    1、wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。该wait()方法有三种形式:无时间参数的wait(一直等待,直到其他线程通知),带毫秒参数的wait和带毫秒、微秒参数的wait(这两种方法都是等待指定时间后自动苏醒)。调用wait()方法的当前线程会释放对该同步监视器的锁定。
    2、notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该同步监视器的锁定后(使用wait()方法),才可以执行被唤醒的线程。
    3、notifyAll():唤醒在此同步监视器上等待的所有线程。只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。
    程序通过一个旗帜来标识账户中是否已存款,当旗帜为false时,表示账户中没有存款,存款线程可以向下执行,当存款者把钱存入账户后,将旗帜设为true,并调用notify()或notifyAll()方法来唤醒其他线程;当存款者线程进入线程体后,如果旗帜为true就调用wait()方法让线程等待。
    当旗标为true时,表示账户已经存入存款,则取钱者线程可以向下执行,当取钱者把钱从账户中取出来后,将旗帜设为false,并调用notify()或nitiftAll()方法来唤醒其他线程;当取钱者进入线程体后,如果旗帜为false就调用wait()方法让该线程等待。
    本程序为Account类提供了draw()和deposit()两个方法,分别对应该账户的取钱、存款等操作,因为这两个方法可能需要并发修改Account类的balance成员的值,所以这两种方法都使用synchronized修饰成同步方法。除此之外还使用了wait()、notifyAll()来控制线程的协作:

    package section6;
    
    public class Account
    {
        //封装账户编号、账户余额的两个成员变量
        private String accountNo;
        private double balance;
        //标识账户中是否已有存款
        private boolean flag=false;
        public Account(){}
        //有参数构造器
        public Account(String accountNo,double balance)
        {
            this.accountNo=accountNo;
            this.balance=balance;
        }
    
        public String getAccountNo() {
            return accountNo;
        }
        public void setAccountNo(String accountNo) {
            this.accountNo = accountNo;
        }
    
        //因为账户余额不允许随便修改,所以只为balance提供getter方法
        public double getBalance()
        {
            return this.balance;
        }
        public synchronized void draw(double drawAmount)
        {
            try{
                //如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
                if(!flag)
                {
                    wait();
                }
                else
                {
                    //执行取钱操作
                    System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                    this.balance=this.balance-drawAmount;
                    System.out.println("账户余额为:"+this.balance);
                    //将旗帜标识为false,表明取钱完成,等待存钱
                    this.flag=false;
                    //唤醒其他线程
                    notifyAll();
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public synchronized void deposit(double depositAmount)
        {
            try{
                //如果flag为真,表明账户中已有人存钱进去,存钱方法阻塞
                if(flag)//①
                {
                    wait();
                }
                else
                {
                    //执行存钱行为
                    System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount);
                    this.balance=this.balance+depositAmount;
                    System.out.println("账户余额为"+this.balance);
                    //将表明账户已有存款旗帜标为true
                    flag=true;
                    //唤醒其他线程
                    notifyAll();
                }
            }
            catch (InterruptedException ex)
            {
                ex.printStackTrace();
            }
        }
    }
    

    上面程序使用wait()和notifyAll()进行控制,对存款者线程而言,当程序进入deposit()方法,如果flag为true,表明账户已有存款,程序调用wait()方法阻塞;否则程序向下执行存款操作,当存款操作执行完成后,系统将flag设为true,然后调用notifyAll()来唤醒其他被阻塞的线程——如果系统中由存款者线程,存款者线程也会被唤醒,但是该存款者线程执行到①好代码处时再次进入阻塞状态,只有执行draw()方法的取钱线程才可以向下执行。同理,取钱者线程也是如此。
    程序中存款者线程循环100次重复存款,而取钱者线程循环100次重复取钱,存款者线程和取钱者线程分别调用Account对象的额deposit()、draw()方法来实现:

    package section6;
    
    public class DrawThread extends Thread
    {
        //模拟用户账户
        private Account account;
        //当前线程所希望的取钱钱数
        private double drawAmount;
        public DrawThread(String name,Account account,double drawAmount)
        {
            super(name);
            this.account=account;
            this.drawAmount=drawAmount;
        }
        
        //重复100次取钱操作
        public void run()
        {
            for(var i=0;i<100;i++)
            {
                account.draw(drawAmount);
            }
        }
    }
    

    存钱线程类程序

    package section6;
    
    public class DepositThread extends Thread
    {
        //模拟用户账户
        private Account account;
        //当前存款线程所需要存的钱数
        private double depositAmount;
        public DepositThread(String name,Account account,double depositAmount)
        {
            super(name);
            this.account=account;
            this.depositAmount=depositAmount;
        }
        //重复100次执行存款操作
        public void run()
        {
            for(var i=0;i<100;i++)
            {
                account.deposit(depositAmount);
            }
        }
    }
    

    主程序可以启动任意多个存款和取款线程,可以看到所有的取款过程必须等到存款线程存钱后才能向下执行,而存款线程也必须等到取钱线程取钱后才可以向下执行。主程序代码:

    package section6;
    
    public class DrawTest
    {
        public static void main(String[] args)
        {
            //创建一个账户
            var account=new Account("1234567",1);
            new DrawThread("取钱者线程",account,800).start();
            new DepositThread("存钱者甲",account,1000).start();
            new DepositThread("存钱者乙",account,1000).start();
            new DepositThread("存钱者丙",account,1000).start();
    
        }
    }
    


    可以看到存款者线程和取款者线程交替执行,每当有存款者想账户中存入1000元,取钱者线程马上从账户中取出800。因为3个存款者线程尝试存款300次操作,但只有一个存款者线程执行取款100次,所以最后程序将堵塞。

    二、使用Condition控制线程通信——Lock对象同步线程

    如果程序不是使用synchronized关键字来保证同步,而是直接使用Lock对象来保证同步。则系统中不存在隐式的同步监视器,也就不能使用wait()、notify()、notifyAll()方法来进行线程通知了。
    当使用Lock对象来保证同步时,Java提供了一个Condition类保持协调,使用Condition可以让那些得到已经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。
    Condition 将同步监视锁方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与Lock对象组合使用,为每个对象提供多个等待集(wait-set)。在这种情况下,Lock 替代了同步方法或同步代码块,Condition替代了同步监视锁的功能。
    Condition实例实质上被绑定在一个Lock对象上。要获得特定Lock实例的Condition实例,调用Lock对象newCondition()方法即可。Condtion类提供了如下三个方法:
    1、await():类似于隐式同步监视器上的wait()方法,导致当前线程等待,直到其他线程调用该Condtion的signal ()方法或signalAll ()方法来唤醒该线程。该await方法有更多变体:long awaitNanos(long nanosTimeout)、void awaitUninterruptibly()、awaitUntil(Date deadline)等,可以完成更丰富的等待操作。
    2、signal ():唤醒在此Lock对象上等待的单个线程。如果所有线程都在该Lock对象上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以执行被唤醒的线程。
    3、signalAll():唤醒在此Lock对象上等待的所有线程。只有当前线程放弃对该该Lock对象的锁定后,才可以执行被唤醒的线程。
    下面程序Acount使用了Lock对象来控制同步,时使用Condition对象来控制线程的协作运行。

    package section6.condition;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class Account
    {
        //显示定义Lock对象
        private final Lock lock=new ReentrantLock();
        //获取指定Lock对象的Condition
        private final Condition cond=lock.newCondition();
        //标识账户中是否已有存款的旗帜
        private boolean flag=false;
        //封装账户编号、账户余额的两个成员变量
        private String accountNo;
        private double balance;
        public Account(){}
        public Account(String accountNo,double balance)
        {
            this.accountNo=accountNo;
            this.balance=balance;
        }
    
        public String getAccountNo() {
            return accountNo;
        }
        public void setAccountNo(String accountNo) {
            this.accountNo = accountNo;
        }
        //因为账户余额不能随意修改,所以只为balance提供getter方法
    
    
        public double getBalance() {
            return balance;
        }
        public void draw(double drawAmount) {
            lock.lock();
            try {
                //如果flag为假,表明账户中还没有人存钱进去
                if (!flag)
                {
                    cond.await();
                }
                else
                {
                    //执行取钱操作
                    System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                    this.balance=this.balance-drawAmount;
                    System.out.println("账户余额为:"+this.balance);
                    //将旗帜设为false
                    this.flag=false;
                    //唤醒其他线程
                    cond.signalAll();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    对比上一小节中的Account.java,不难发现者两个程序的逻辑基本相似,只是现在显式地调用Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒其他线程。

    三、使用阻塞队列(BlockingQueue)控制线程通信

    java 5提供了一个BlockingQueue接口(Queue的子接口),但它的主要作用不是作为容器,而是作为线程同步的工具。BlockingQueue有一个特征:当生产者试图向BlockingQueue中放入元素时,如果该队列已满,则该线程阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。

    3.1 BlockingQueue的方法介绍

    程序中两个线程通过交替向BlockingQueue中放入元素、取出元素,既可以很好地控制线程通信。
    BlockingQueue提供了如下两个支持阻塞的方法:
    1、put(E e):尝试把E元素放入BlockingQueue中,如果该队列元素已满,则阻塞该线程。
    2、take():尝试从BlockingQueue的头部元素取出元素,如果该队列已空,则阻塞该线程。
    BlockingQueue继承Queue接口,当然也可以使用Queue接口中的方法,将这些方法归纳起来分为3组:
    1、在队列尾部插入元素。包括add(E e)、offer(E e)和put(E e)方法,当队列已满时,这三个方法分别会抛出异常、返回false、阻塞队列。
    2、在队列头部删除并返回删除的元素。包括remove()、poll()、take()方法。当队列已空时,这三个方法分别会抛出异常、返回false、阻塞队列。
    3、在队列头部取出元素但不删除。包括element()和peek()方法,当队列已空时,这两个方法分别抛出异常、返回false。如下表所示:
    BlockingQueue包含的方法之间的对应关系

    抛出异常 不同返回值 阻塞线程 指定超时时长
    队尾插入元素 add(e) offer(e) put(e) offer(e,time,unit)
    在对头删除元素 remove() poll() take() poll(time,unit)
    获取但不删除对头元素 element() peek()

    3.2 BlockingQueue与其实现类之间的类图


    从上图可以看出,BlockingQueue包含了如下5个实现类
    1、ArrayBlockingQueue:基于数组实现的BlockingQueue队列。
    2、LinkedBlockingQueue:基于链表实现的BlockingQueue队列。
    3、PriorityBlockingQueue:它并不是标准的阻塞队列。与前面介绍的Priority类似,该队列调用remove()、poll()、take()等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。PriorityBlockingQueue判断元素的大小即可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
    4、SynchronousQueue:同步队列/对该队列的存、取操作必须交替进行。
    5、DelayQueue:它是一个特殊的BlockingQueue,,底层基于PriorityBlockingQueue实现。不过,DelayQueue要求集合元素都实现Delay接口(该接口里只有一个long getDelay()方法),DelayDeque根据集合元素的getDelay()方法的返回值进行排序。

    3.3 BlockingQueue应用举例

    下面以ArrayBlockingQueue为例介绍阻塞队列的功能和用法。下面介绍一个最简单的程序来测试BlockingQueue的put()用法:

    package section6;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockQueueTest
    {
        public static void main(String[] args) throws InterruptedException {
            //定义一个长度为2的阻塞队列
            BlockingQueue<String> bq=new ArrayBlockingQueue<>(2);
            bq.put("java");//与bp.add("java")、bq.offer("java")相同
            bq.put("java");
            bq.put("java");//①阻塞线程
        }
    }
    

    上面程序先定义了一个大小为2的BlockingQueue,程序先向该队列中放入两个元素,此时队列还没有满,两个元素都可以放入,因此使用bp.add("java")、bq.offer("java")相同。当程序使用put()尝试放入第三个元素时将会阻塞线程,如上面代码①处所示;如果使用add()将触发异常;使用offer()将返回false。
    与此类似的是,BlockingQueue已空的情况下,程序使用take()方法尝试取出元素将会阻塞线程;使用remove()方法将会触发异常;使用poll()方法将会返回false,元素不会被删除。
    掌握了BlockingQueue阻塞队列的特性后,下面程序就可以利用BlockingQueue实现线程通信了。
    生成者类Producer:

    package section6;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingQueueTest1
    {
        public static void main(String[] args)
        {
            //创建一个容量为1的BlockingQueue
            BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
            //启动三个生产者线程
            new Producer(bq).start();
            new Producer(bq).start();
            new Producer(bq).start();
            //启动一个消费者线程
            new Consumer(bq).start();
    
        }
    }![](https://img2020.cnblogs.com/blog/1764014/202005/1764014-20200512214824736-92709312.png)
    
    

    定义一个消费者线程类Comsumer

    package section6;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer extends Thread
    {
        private BlockingQueue<String> bq;
        public Consumer(BlockingQueue<String> bq)
        {
            this.bq=bq;
        }
        public void run()
        {
            while(true)
            {
                System.out.println(getName()+"消费者准备消费集合元素!");
                try
                {
                    Thread.sleep(2);
                    //尝试取出元素,如果队列已空,则线程被阻塞
                    bq.take();
                }
                catch (InterruptedException ex)
                {
                    ex.printStackTrace();
                }
                System.out.println(getName()+"消费完成:"+bq);
                
            }
        }
    }
    

    主程序入口

    package section6;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class BlockingQueueTest1
    {
        public static void main(String[] args)
        {
            //创建一个容量为1的BlockingQueue
            BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
            //启动三个生产者线程
            new Producer(bq).start();
            new Producer(bq).start();
            new Producer(bq).start();
            //启动一个消费者线程
            new Consumer(bq).start();
    
        }
    }
    

    上面启动了3个生成者线程向BlockingQueue集合中放入元素,启动一个消费者线程从BlockingQueue中取出元素。本程序中BlockingQueue的集合容量为1,因此3个生产者线程无法连续放入元素,必须等待消费者从线程中取出一个元素后,3个生产者线程的其一才能放入一个元素,运行该程序可以看到如下输出结果:

  • 相关阅读:
    杂记5
    杂记4
    杂记3
    杂记2
    杂记1
    也来个网页版本的五子棋
    验证码识别
    npm publish命令
    window nginx php ci框架环境搭建
    也来个网页版本的五子棋
  • 原文地址:https://www.cnblogs.com/weststar/p/12875280.html
Copyright © 2011-2022 走看看