zoukankan      html  css  js  c++  java
  • 操作系统实验——PV操作实现生产者消费者模型

    操作系统PV操作之——生产者消费者模型

    个人博客主页
    参考资料:
    Java实现PV操作 | 生产者与消费者

    浙大公开课

    在操作系统的多进程、多线程操作中经常会有因为同步、互斥等等问题引发出的一系列问题,我们的前辈为了解决这些问题,发明出了“信号量(Semaphore)”这么一个令人称奇的变量,就目前来看,很巧妙的解决了这些问题。

    • 信号量是个整形变量
    • 信号量S只允许两个标准操作wait()和signal(),或者他的发明者称呼的P操作和V操作
    • wait()和signal()是原子操作,不可分割的原语

    对PV操作的定义

    对PV操作的定义不是单一的,这里举个比较简单的例子

    /*P操作*/
    wait(S){
        value--;
        if(value < 0){
    	/*value的大小表示了允许同时进入临界区进行操作的
    	  进程数量*/
            /*add this process to waiting queue*/
            block();
        }
    }
    
    /*V操作*/
    signal(S){
        value++;
        if(value <= 0){
    	/*因为P操作是当value<0时休眠一个线程,说明如果有休眠
    	  的线程,则value一定小于0,所以此时当value+1后,如果
    	  还有休眠的线程,value必定小于或等于0 */
            /*remove a process P from the waiting queue*/
            wakeup(P);
        }
    }
    

    信号量的应用

    1. 临界区(互斥)问题,信号量初值需要置为1,表示只能有一个进程进入临界区,从而保护临界区内的数据同时只能被一个进程访问,避免出现多个进程同时操作同一个数据。

      Semaphore S;        //初始值为1
      do{
          wait(S);
              Critical Section;		//临界区
          signal(S);
              remainder section	//剩余部分
      }while(1);
      
    2. 两个进程的同步问题。假如有两个进程 Pi和Pj,Pi有个A语句(输入x的值),Pj有个B语句(输出x+1的值),希望在B语句执行之前A语句已经执行完成。

      //同步,定义信号量flag初值为0,等待方用wait操作,被等待方用signal操作,还要紧贴着放
      Pi进程							Pj进程
          ...							...
         	A						 wait(flag)
       signal(flag)						B
          ...							...
      

    生产者消费者模型

    先定义出PV操作的类

    /**
     * 封装的PV操作类,为了简单起见,没有用一个等待队列,而是直接用
     * Java的Object类方法中的wait方法来模拟
     * @author Vfdxvffd
     * @count 信号量
     * 这里调用wait方法和signal方法的是同一个对象this,所以V操作唤
     * 醒的只能是同一个对象P操作加入等待队列的进程
     */
    class syn{		
    	int count = 0;
    	syn(){}
    	syn(int a){count = a;}	//给信号量赋初值
    	
    	public synchronized void Wait() {
    		count--;
    		if(count < 0) {		//block
                 /*add this process to waiting queue*/
    			try {
    				wait();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	public synchronized void Signal() {
    		count++;
    		if(count <= 0) {	//wakeup
                /*remove a process P from the waiting queue*/
    			notify();
    		}
    	}
    }
    

    单生产单消费(PV操作解决同步问题)

    1. 先引入全局的信号量,将其封装在一个类中

      class Global{
      	static syn empty = new syn(2);	//成员变量count表示剩余空闲缓冲区的数量, >0则生产者进程可以执行
      	static syn full = new syn(0);	//成员变量count表示当前待消费物品的数量, >0则消费者进程可以执行
      	static int[] buffer = new int[2];	//缓冲区数组,大小代表缓冲区的数量,即放面包的盘子
      }
      
    2. 生产者类

      /**
       * 单个生产者类
       * @author Vfdxvffd
       * @count 生产的物品数量标号
       */
      class Producer implements Runnable{
      	int count = 0;		//数量
      	@Override
      	public void run() {
      		while(count < 20) {			//最多生产20件商品
      			Global.empty.Wait();	/*要生产物品了,给剩余空
       		闲缓冲区数量--,如果减完后变为负数,则说明当前没
       		有空闲缓冲区,则加入等待队列*/
      			//临界区,生产商品
      			int index = count % 2;
      			Global.buffer[index] = count;
      			System.out.println("生产者在缓冲区"+index+"中生产了物品"+count);
      			count++;
                  	/*可以在此处让进程休眠几秒钟,然后就可能在此时
       		   CPU将资源调给消费者,但是可以发现由于下面语句还
       		   未执行,所以消费者拿到CPU执行权也只能消费掉前几
       		   次生产的商品,这次生产的商品依旧无法被消费*/
      			Global.full.Signal();/*发出一个信号,表示缓冲区已
       		经有物品了,可以来消费了,成员变量count的值表示缓冲
       		区的待消费物品的数量,相当于唤醒消费者*/
      		}
      	}
      }
      
    3. 消费者类

      /**
       * 单个消费者类
       * @author Vfdxvffd
       * @count 物品数量标号
       */
      class Consumer implements Runnable{
      	int count = 0;
      	@Override
      	public void run() {
      		while(count < 20) {
      			Global.full.Wait();	/*要消费物品了,给当前待消费
       		物品--,如果减完为负数,则说明当前没有可消费物品,
       		加入等待队列*/
      			//临界区
      			int index = count % 2;
      			int value = Global.buffer[index];
      			System.out.println("消费者在缓冲区"+index+"中消费了物品"+value);
      			count++;
               /*可以在此处让进程休眠几秒钟,然后就可能在此时CPU将
       		资源调给生产者,但是可以发现由于下面语句还未执行,
       		所以生产者拿到CPU执行权也只能生产在前几次消费的商品
       		腾出的缓冲区,这次消费的商品腾出的地方依旧无法被用
       		于生产*/
      			Global.empty.Signal();	/*消费完一个物品后,释放
       		一个缓冲区,给空闲缓冲区数量++,唤醒生产者可以生产
       		商品了*/
      		}
      	}
      }
      
    4. 主类测试

      public class ConsumeAndProduce{
      	public static void main(String[] args) {
      		Producer pro = new Producer();
      		Consumer con = new Consumer();
      		Thread t1 = new Thread(pro);
      		Thread t2 = new Thread(con);
      		t1.start();
      		t2.start();
      	}
      }
      
    5. 总结

      运行结果

    ​ empty和full两个信号量的作用就在于消费者消费之前检查是否有待消费商品,如果有则让他去消费,没有就要将消费者进程放进等待队列,等到生产者生产了商品后又将其从等待队列中取出。生产者在生产之前需要先检查是否有足够的缓冲区(存放商品的地方),如果有则让其去生产,没有的话就要进入等待队列等待消费者消费缓冲区的商品。

    多生产者多消费者(PV操作解决互斥问题)

    因为有多个生产者和消费者来生产和消费商品,我们需要在Global中加入两个变量pCount、cCount,分别用来表示生产的商品的序号和被消费的商品的序号,之前是因为只有单个生产消费着,所以直接将其定义在run方法中即可,但这是有多个生产者和消费者,所以要放到一个公共区,一起去操作它。但是如果我们有多个生产者和多个消费者,会不会出现线程安全问题?答案是肯定的。生产者重复生产了同一商品

    在这里插入图片描述

    ​ 这种情况如何出现的呢,我们先看run方法里的代码

    @Override
    public void run() {
    	while(Global.pCount < 20) {	//最多生产20件商品
    		Global.empty.Wait();	
    		//临界区
    		int index = Global.pCount % 2;
    		Global.buffer[index] = Global.pCount;
    		System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount);
    		Global.pCount++;
    		try {
    			Thread.sleep(10);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		Global.full.Signal();
    	}
    }
    

    假如生产者1号生产了0号商品,但此时他还没做Global.pCount++这一步操作,CPU将执行权切换到生产者2号,这时Global.pCount的值还是刚刚的0,没有加1,所以又会生产出一个0号商品,那消费者也同理,消费完还没加1,就被切换了执行权。

    那就有个问题,如果我们将Global.pCount++这一步提前能不能解决问题呢,当然也是不行的,因为可能++完还没输出就被切换执行权,那下次执行权回来时候就会继续执行输出操作,但此时的Global.pCount的值已经不知道加了多少了。

    解决方法

    解决的办法就是加入新的信号量Mutex,将初始值设为1,引起多个生产者之间的互斥,或者多个消费者之间的互斥,即1号生产者操作pCount这个数据的时候,其他生产者无法对pCount进行操作。就是我们说的信号量的第一个应用,解决互斥问题。

    package OS;
    
    /**
     * 封装的PV操作类
     * @author Vfdxvffd
     * @count 信号量
     */
    class syn{		
    	int count = 0;
    	syn(){}
    	syn(int a){count = a;}
    	
    	public synchronized void Wait() {
    		count--;
    		if(count < 0) {		//block
    			try {
    				wait();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    	
    	public synchronized void Signal() {
    		count++;
    		if(count <= 0) {	//wakeup
    			notify();
    		}
    	}
    }
    
    
    class Global{
    	static syn empty = new syn(2);	//成员变量count表示剩余空闲缓冲区的数量 >0则生产者进程可以执行
    	static syn full = new syn(0);	//成员变量count表示当前待消费物品的数量 >0则消费者进程可以执行
    	static syn pMutex = new syn(1);	//保证生产者之间互斥的信号量
    	static syn cMutex = new syn(1);	//保证消费者之间互斥的信号量
    	static int[] buffer = new int[2];//缓冲区,就像放面包的盘子
    	static int pCount = 0;		//生产者生产的商品编号
    	static int cCount = 0;		//消费者消费的商品编号
    }
    
    /**
     * 生产者类
     * @author Vfdxvffd
     * @count 生产的物品数量标号
     * Global.empty.Wait();和Global.pMutex.Wait();的顺序无所谓,
     * 只要和下面对应即可,要么都包裹在里面,要么都露在外面
     */
    class Producer implements Runnable{
    	@Override
    	public void run() {
    		while(Global.pCount < 20) {			//最多生产20件商品
    			Global.empty.Wait();	/*要生产物品了,给剩余空
    			闲缓冲区数量--,如果减完后变为负数,则说明当前没有
    			空闲缓冲区,则加入等待队列*/
    			Global.pMutex.Wait();	/*保证生产者之间的互斥,
    			就像是加了一个锁一样this.lock()*/
    			//临界区
    			int index = Global.pCount % 2;
    			Global.buffer[index] = Global.pCount;
    			System.out.println(Thread.currentThread().getName()+"生产者在缓冲区"+index+"中生产了物品"+Global.pCount);
    			Global.pCount++;
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			Global.pMutex.Signal();//相当于释放锁this.unlock()
    			Global.full.Signal();/*发出一个信号,表示缓冲区已
    			经有物品了,可以来消费了,成员变量count的值表示缓冲
    			区的待消费物品的数量,相当于唤醒消费者*/
    		}
    	}
    }
    
    /**
     * 消费者类
     * @author Vfdxvffd
     * @count 物品数量标号
     * Global.full.Wait();和Global.cMutex.Wait();的顺序无所谓,
     * 只要和下面对应即可,要么都包裹在里面,要么都露在外面
     */
    class Consumer implements Runnable{
    	@Override
    	public void run() {
    		while(Global.cCount < 20) {
    			Global.full.Wait();	/*要消费物品了,给当前待消费
    			物品--,如果减完为负数,则说明当前没有可消费物品,
    			加入等待队列*/
    			Global.cMutex.Wait();//保证消费者之间的互斥
    			//临界区
    			int index = Global.cCount % 2;
    			int value = Global.buffer[index];
    			System.out.println(Thread.currentThread().getName()+"消费者在缓冲区"+index+"中消费了物品"+value);
    			Global.cCount++;
    			try {
    				Thread.sleep(10);
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			Global.cMutex.Signal();
    			Global.empty.Signal();	/*消费完一个物品后,释放
    			一个缓冲区,给空闲缓冲区数量++*/
    		}	
    	}
    }
    
    public class ConsumeAndProduce{
    	public static void main(String[] args) {
    		Producer pro = new Producer();
    		Consumer con = new Consumer();
    		Thread t1 = new Thread(pro);
    		Thread t2 = new Thread(con);
    		Thread t3 = new Thread(pro);
    		Thread t4 = new Thread(con);
    		t1.start();
    		t2.start();
    		t3.start();
    		t4.start();
    	}
    }
    
  • 相关阅读:
    跨域
    redis安装
    iframe操作
    element-ui 合并相邻的相同行 span-method
    函数实现 a?.b?.c?.d
    git 使用流程 命令
    svg用作背景图
    js中的位运算符 ,按位操作符
    二十三种设计模式[23]
    二十三种设计模式[22]
  • 原文地址:https://www.cnblogs.com/vfdxvffd/p/12855526.html
Copyright © 2011-2022 走看看