zoukankan      html  css  js  c++  java
  • 0038 Java学习笔记-多线程-传统线程间通信、Condition、阻塞队列、《疯狂Java讲义 第三版》进程间通信示例代码存在的一个问题

    调用同步锁的wait()、notify()、notifyAll()进行线程通信

    • 看这个经典的存取款问题,要求两个线程存款,两个线程取款,账户里有余额的时候只能取款,没余额的时候只能存款,存取款金额相同。相当于存取款交替进行,金额相同。
    • 线程间通信,需要通过同一个同步监视器(也就是this或者显式的Object对象)调用通信方法,
    • Object有三个方法,可以用于线程间通信
      • wait()
        • 当前线程等待,并释放同步锁
        • wait():无限期等待
        • wait(long timeout):等待timeout毫秒,
        • wait(long timeout,int nanos):等待timeout毫秒+nanos纳秒,nanos的范围[0,999999]
      • notify()
        • 唤醒该同步监视器上的任意一个线程
        • 只有当前线程调用了wait()方法后,被notify()唤醒的线程才会唤醒
      • notifyAll()
        • 唤醒该同步监视器上的所有线程
        • 只有当前线程调用了wait()方法后,被notify()唤醒的线程才会唤醒
    • 看示例代码:
    package testpack;
    public class Test1  { 
        public static void main(String[] args){ 
        	Account ac=new Account("A123",0.0);
        	new Deposit("存款者A",ac,325.0).start();       //这里开启两个存款线程
        	new Withdraw("取款者甲",ac,325.0).start();     //开启两个取款线程
        	new Deposit("存款者B",ac,325.0).start();
        	new Withdraw("取款者乙",ac,325.0).start();
        }
    }
    class Withdraw extends Thread{                          //取款任务
    	private Account account;
    	private double withdrawAmount;
    	public Withdraw (String threadName,Account account,double withdrawAmount){
    		super(threadName);
    		this.account=account;
    		this.withdrawAmount=withdrawAmount;
    	}
    	public void run(){
    		for (int i=1;i<=2;i++){                         //每个线程循环取款2次
    			account.withdraw(withdrawAmount);
    		}
    	}
    }
    class Deposit extends Thread{                           //存款任务
    	private Account account;
    	private double depositAmount;
    	public Deposit (String threadName,Account account,double depositAmount){
    		super(threadName);
    		this.account=account;
    		this.depositAmount=depositAmount;
    	}
    	public  void run(){
    		for (int i=1;i<=2;i++){                         //每个线程循环存款2次
    			account.deposit(depositAmount);
    		}
    	}
    }
    class Account {
    	private String accountNO;
    	private double balance;                              //账户余额
    	private boolean flag=false;                          //用于判断该账户是否可以进行存款或取款
    	public Account(){}
    	public Account(String no,double balance){
    		accountNO=no;
    		this.balance=balance;
    	}
    	public double getBalance(){
    		return balance;
    	}
    	public synchronized void withdraw(double amount){    //同步方法,取款
    		try {
    			while (!flag){                 //标记㈠。特别注意,这里用while进行循环判断,而不是用if-else判断
    				this.wait();                             //flag为false,则不可取款,线程等待,并释放同步锁
    			}
    			System.out.println(Thread.currentThread().getName()+"取款:"+amount);
    			balance-=amount;
    			System.out.println("取款后,余额为: "+balance);
    			flag=false;                                   //取款完毕后,将flag切换为false,下一个线程如果是取款线程,则不能取款
    			System.out.println("---------------上面取款完毕-------------------");
    			this.notifyAll();                             //标记㈢。取款完毕,唤醒其他所有线程
    		}catch(InterruptedException ex){
    			ex.printStackTrace();
    		}
    	}
    	public synchronized void deposit(double amount){     //同步方法,存款
    		try{
    			while (flag){                 //标记㈡。特别注意,这里用while进行循环判断,而不是用if-else判断
    				this.wait();                             //如果flag为true,则不能存款,线程等待并释放同步锁
    			}
    			System.out.println(Thread.currentThread().getName()+"存款"+amount);
    			balance+=amount;
    			System.out.println("存款后,账户余额为: "+balance);
    			flag=true;                                    //存款完毕后,将flag切换为true,下一个线程如果是存款线程,则不能存款
    			System.out.println("---------------上面存款完毕-------------------");
    			this.notifyAll();                             //标记㈣存款完毕后,唤醒其他所有线程
    			
    		}catch(InterruptedException ex){
    			ex.printStackTrace();
    		}
    	}
    }
    

    输出:

    存款者A存款325.0
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    取款者乙取款:325.0
    取款后,余额为: 0.0
    ---------------上面取款完毕-------------------
    存款者B存款325.0
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    取款者甲取款:325.0
    取款后,余额为: 0.0
    ---------------上面取款完毕-------------------
    存款者B存款325.0
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    取款者乙取款:325.0
    取款后,余额为: 0.0
    ---------------上面取款完毕-------------------
    存款者A存款325.0
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    取款者甲取款:325.0
    取款后,余额为: 0.0
    ---------------上面取款完毕-------------------

    • 看上面的输出:存款者A和B,取款者甲和乙分别各进行了2次存款或取款操作,并且交替执行
    • 看上面的标记㈢和㈣
      • 这里只能使用notifyAll(),而不能使用notify()方法,因为可能导致程序阻塞,比如:
      • 存款A线程第一次存款完毕,唤醒一个线程(当然第一次没有线程可供唤醒)并再次执行,wait()。状态:A阻塞+B甲乙就绪
      • 存款B线程试图存款,失败,wait()。状态:AB+甲乙
      • 取款甲线程第一次取款完毕,唤醒存款A线程,并再次执行,wait()。状态:B甲+A乙
      • 取款乙线程试图取款,失败,wait()。状态:B甲乙+A
      • 存款A线程第二次存款完毕,唤醒存款B线程,并再次执行,wait()。状态:甲乙A+B
      • 存款B线程试图存款,失败,wait()。状态:AB甲乙均处于wait()状态
      • 此时,四个线程都处于阻塞状态
    • 再看上面的标记㈠和㈡
      • 上面这段代码主要来源于《疯狂Java讲义 第三版》的“codes1616.6synchronized”目录
      • 原代码用的if-else对flag进行判断,这里存在问题,直接导致不论存款(或取款)成功或失败(即wait),run()方法的循环计数器都会自增1,导致存款(或取款)次数比预计的少,进而导致存款(取款线程已执行完,而存款线程仍在执行)或取款(存款线程已执行完,而取款线程仍在执行)线程阻塞
      • 应当采用while进行循环判断,线程被唤醒之后,应再次进行判断,而不是直接将循环计数器自增,可以保证在每个循环中都成功进行了一次存款

    调用Condition对象的的await()、signal()、signalAll()方法实现线程间通信

    • 上面Object的wait()、notify()、notifyAll()方法只能适用于this、显式的Object对象
    • 对于用Lock进行加锁的同步方法,上面的三个方法则不适用,这时候得靠Condition对象的另外三个方法
    • 通过Lock锁的newCondition()方法返回一个Condition对象,然后调用该对象的下面三个方法进行通信
      • await()
        • 类似于wait()方法
        • await(long timeout,int nanos)
        • awaitnanos(long nanosTimeout)
        • awaitUninterruptibly()
        • awaitUntil(Date deadline)
      • signal()
        • 类似于notify()
      • signalAll()
        • 类似于notifyAll()
    • Lock锁的newCondition()方法返回的是ConditionObject对象,这是AbstractQueuedSynchronizer抽象类的一个内部类,该内部类实现了Condition接口
    • 下面用Lock及这三个新方法改写上面的Account类
    class Account {
    	private String accountNO;
    	private double balance;
    	private boolean flag=false;
    	private final ReentrantLock lock=new ReentrantLock();   //创建一把Lock锁
    	private final Condition cond=lock.newCondition();       //返回Condition对象
    	public Account(){}
    	public Account(String no,double balance){
    		accountNO=no;
    		this.balance=balance;
    	}
    	public double getBalance(){
    		return balance;
    	}
    	public void withdraw(double amount){
    		lock.lock();                                         //获取锁并加锁
    		try {
    			while (!flag){
    				cond.await();                                //调用Condition对象的await()方法
    			}
    			System.out.println(Thread.currentThread().getName()+"取款:"+amount);
    			balance-=amount;
    			System.out.println("取款后,余额为: "+balance);
    			flag=false;
    			System.out.println("---------------上面取款完毕-------------------");
    			cond.signalAll();
    		}catch(InterruptedException ex){
    			ex.printStackTrace();
    		}finally{
    			lock.unlock();                                    //释放锁
    		}
    	}
    	public void deposit(double amount){
    		lock.lock();
    		try{
    			while (flag){
    				cond.await();
    			}
    			System.out.println(Thread.currentThread().getName()+"存款"+amount);
    			balance+=amount;
    			System.out.println("存款后,账户余额为: "+balance);
    			flag=true;
    			System.out.println("---------------上面存款完毕-------------------");
    			cond.signalAll();
    			
    		}catch(InterruptedException ex){
    			ex.printStackTrace();
    		}finally{
    			lock.unlock();
    		}
    	}
    }
    

    如果调用了Lock对象的wait()、notify()、notifyAll()方法会怎样?

    • Lock对象也是Object的子类的实例,也拥有这三个方法,按理说调用Lock对象这个同步监视器的该三个方法,也应该能达到通信的目的
    • 改写后,程序输出如下:

    存款者A存款325.0Exception in thread "存款者A" Exception in thread "取款者甲" //
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    取款者甲取款:325.0
    取款后,余额为: 0.0
    ---------------上面取款完毕-------------------
    存款者B存款325.0
    存款后,账户余额为: 325.0
    ---------------上面存款完毕-------------------
    Exception in thread "存款者B" 取款者乙取款:325.0
    取款后,余额为: 0.0
    java.lang.IllegalMonitorStateException
    ---------------上面取款完毕-------------------
    at java.lang.Object.notifyAll(Native Method)
    at testpack.Account.deposit(Test1.java:86)
    at testpack.Deposit.run(Test1.java:39)
    Exception in thread "取款者乙" java.lang.IllegalMonitorStateException
    at java.lang.Object.notifyAll(Native Method)
    at testpack.Account.withdraw(Test1.java:68)
    at testpack.Withdraw.run(Test1.java:25)
    java.lang.IllegalMonitorStateException
    at java.lang.Object.notifyAll(Native Method)
    at testpack.Account.withdraw(Test1.java:68)
    at testpack.Withdraw.run(Test1.java:25)
    java.lang.IllegalMonitorStateException
    at java.lang.Object.notifyAll(Native Method)
    at testpack.Account.deposit(Test1.java:86)
    at testpack.Deposit.run(Test1.java:39)

    • 上面出现了大量的“IllegalMonitorStateException”异常,暂时还分析不了出错的原因

    通过阻塞队列实现线程间通信

    • 上面的Account的取款、存款问题,抽象一下:一个Account,两个任务(一个存款、一个取款),每个任务两条线程(但两条线程完成的并不是同一项任务)

    • BlockingQueue是一个阻塞队列接口,它有很多实现类,见下图:来源于《Java疯狂讲义 第三版》
      BlockingQueue及实现类图

    • 实现类:

      • ArrayBlockingQueue:基于数组实现
      • LinkedBlockingQueue:基于链表实现
      • PriorityBlockingQueue:内部元素按照排序器排序,并非先进先出
      • SynchronousQueue:同步队列,存取交替进行
      • DelayQueue:内部元素实现Delay接口,内部元素按照getDelay()的返回值排序
    • 该接口是Queue的子接口,但并不是作为容器使用,而是作为线程同步工具使用。

    • 当一个线程要往里面put()一个元素时,若队列已满,则线程阻塞

    • 当一个线程从里面take()一个元素时,若队列为空,则线程阻塞

    • 三类方法

      • 在队列尾部插入元素:若队列已满,分别会:
        • add(E e):抛出异常
        • offer(E e):返回false
        • put(E e):阻塞队列
      • 在队列头部取出元素,并删除元素:若队列为空,分别会:
        • remove():抛出异常
        • poll():返回false
        • take():阻塞队列
      • 在队列头部取出元素,但不删除元素:若队列为空,分别会:
        • element():抛出异常
        • peek():返回false
    • 见示例:

    package testpack;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class Test2  { 
        public static void main(String[] args){ 
        	BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
        	new Producer(bq,"生产者A").start();
        	new Producer(bq,"生产者B").start();
        	new Consumer(bq,"消费者X").start();   //两个生产者,一个消费者,会产生阻塞
        }
    }
    class Producer extends Thread{
    	private BlockingQueue<String> bq;
    	Producer(BlockingQueue bq,String name){
    		super(name);
    		this.bq=bq;
    	}
    	public void run(){                         //run()方法没有被同步,for循环中的代码可能被分开执行
    		String[] str={"A","B","C"};
    		for (int i=0;i<3;i++){
    			System.out.println(getName()+" 准备向阻塞队列中添加元素");
    			try{
    				bq.put(str[i%3]);
    			}catch(InterruptedException ex){
    				ex.printStackTrace();
    			}
    			System.out.println(getName()+"添加元素完成: "+bq);
    		}
    	}
    }
    class Consumer extends Thread{
    	private BlockingQueue<String> bq;
    	Consumer(BlockingQueue bq,String name){
    		super(name);
    		this.bq=bq;
    	}
    	public void run(){
    		for (int i=0;i<3;i++){
    			System.out.println(getName()+" 准备从阻塞队列中取出元素");
    			try{
    				System.out.println(getName()+"取出元素成功: "+bq.take());
    			}catch(InterruptedException ex){
    				ex.printStackTrace();
    			}
    		}
    	} 
    }
    
    • 输出结果如下:

    生产者A 准备向阻塞队列中添加元素 //线程A被中断,可能在添加成功前或后
    生产者B 准备向阻塞队列中添加元素 //线程B可能被中断,可能被阻塞
    生产者A添加元素完成: [M] //线程A添加成功
    生产者A 准备向阻塞队列中添加元素 //线程A阻塞
    消费者X 准备从阻塞队列中取出元素
    消费者X取出元素成功: M //线程X取出成功
    消费者X 准备从阻塞队列中取出元素 //线程X被阻塞
    生产者B添加元素完成: [M] //线程B添加成功
    生产者A添加元素完成: [N] //这里之所以连续添加2次,因为X已将元素取出,但没有输出
    消费者X取出元素成功: M //X将取出的元素输出
    生产者A 准备向阻塞队列中添加元素 //线程A被阻塞或中断
    生产者B 准备向阻塞队列中添加元素 //线程B被阻塞或中断
    消费者X 准备从阻塞队列中取出元素
    消费者X取出元素成功: N //X将取出的元素输出
    生产者A添加元素完成: [K] //三次消费已执行结束,生产者线程还在执行,程序阻塞

    • ArrayBlockingQueue内部定义了一把private的ReentrantLock锁,在创建对象时创建锁对象(false策略)
    • 在put()/take()阻塞的时候,会释放ReentrantLock锁对象
    • 该示例存在的问题:生产和消费的run()方法没有被同步,导致输出的信息错乱;如果在run()中设置同步代码块,用bq做锁,则在生产方阻塞的时候导致死锁,暂时还不会解决。
  • 相关阅读:
    121. Best Time to Buy and Sell Stock
    70. Climbing Stairs
    647. Palindromic Substrings
    609. Find Duplicate File in System
    583. Delete Operation for Two Strings
    556 Next Greater Element III
    553. Optimal Division
    539. Minimum Time Difference
    537. Complex Number Multiplication
    227. Basic Calculator II
  • 原文地址:https://www.cnblogs.com/sonng/p/6138398.html
Copyright © 2011-2022 走看看