zoukankan      html  css  js  c++  java
  • 操作系统中的经典问题——生产者消费者问题(两种方式实现)

    操作系统中的经典问题——生产者消费者问题(两种方式实现)

    1、问题引入:什么是生产者消费者问题?

    生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
    .

    要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用进程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

    2、问题分析

    该问题需要注意的几点:

    1. 在缓冲区为空时,消费者不能再进行消费
    2. 在缓冲区为满时,生产者不能再进行生产
    3. 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即保持线程间的同步
    4. 注意条件变量与互斥锁的顺序

    由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用条件变量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作,之后再解锁并唤醒其他可用阻塞线程。

    在访问共享区资源时,为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某一时刻只有一个线程在访问共享资源。

    3、第一类:使用synchronized关键字来进行实现

    3.1、生产者、消费者问题,归根到底也都是线程间的通信问题。一个处于活动状态,一个处于等待唤醒状态。

    3.2、这里设共享的资源为一个int数,调用方法进行加一、减一的操作模拟操作系统对共享资源的分配。

    3.3、实现对操作的资源进行共享,主要有三步,判断等待、业务、通知,因此可以将资源类的代码实现如下:(代码如果看不懂,详情请看注释,注释详尽)

    /**资源类**/
    class Data {
        private int number = 0;
    
        /**
         * 判断等待、业务、通知
         */
    
        //+1
        public synchronized void increment() throws InterruptedException {
            if (number != 0) {
                // 等待
                this.wait();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他线程,我+1完毕了
            this.notifyAll();
        }
        //-1
        public synchronized void decrement() throws InterruptedException {
            if (number == 0) {
                //等待
                this.wait();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他线程,我-1完毕了
            this.notifyAll();
        }
    }
    

    3.4、编写两个线程,进行测试,看是否能有序的进行资源的共享

    package com.xgp.pc;
    
    /**
     * 线程之间的通信问题:生产者和消费者问题! 等待唤醒 通知
     * 线程交替问题 A  B 操作同一个变量  num = 0
     * A num+1
     * B num-1
     * @author 薛国鹏
     */
    @SuppressWarnings("all")
    public class A {
        public static void main(String[] args) {
            Data data = new Data();
    
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"A").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"B").start();
        }
    }
    

    运行结果:

    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    
    进程完成,退出码 0
    

    3.5、有运行结果可以看出,当系统中只有两个线程时,能够有序的进行进行对临界资源的共享。此时,将两个线程改为4个线程(两个增加操作,两个减少操作)再进行测试

    package com.xgp.pc;
    
    /**
     * 线程之间的通信问题:生产者和消费者问题! 等待唤醒 通知
     * 线程交替问题 A  B 操作同一个变量  num = 0
     * A num+1
     * B num-1
     * @author 薛国鹏
     */
    @SuppressWarnings("all")
    public class A {
        public static void main(String[] args) {
            Data data = new Data();
    
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"A").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"B").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"C").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"D").start();
        }
    }
    

    运行结果为:

    A=>1
    B=>0
    A=>1
    B=>0
    C=>1
    A=>2
    D=>1
    D=>0
    A=>1
    C=>2
    B=>1
    B=>0
    C=>1
    A=>2
    D=>1
    D=>0
    A=>1
    C=>2
    B=>1
    B=>0
    C=>1
    A=>2
    D=>1
    D=>0
    A=>1
    C=>2
    B=>1
    B=>0
    C=>1
    A=>2
    D=>1
    D=>0
    A=>1
    C=>2
    B=>1
    B=>0
    C=>1
    D=>0
    C=>1
    D=>0
    
    进程完成,退出码 0
    

    由运行的结果可以看出,当有四个线程时,此时的资源分配共享出现了问题。

    3.6、分析:

    假如:当一个减法操作结束时,此时会通知唤醒其他三个线程我减一完毕了,此时共享资源的值为0,另一个减一线程判断的number=0,自然会进行等待,改线程肯定不是问题所在。两个加一线程,判断的结果是number=1,于是都被唤醒了,当第一个加一线程完成加一操作后,第二个加一线程随即跟随其后完成加一操作。问题的关键就是,当第一个加一线程完成加一操作后,第二个线程因为之前的判断被唤醒了,而且后面并没有其他判断使他沉睡了,因此在值发生改变后,仍然进行了加一操作。所以,我们得限制各个进行随时随刻得检测共享资源得变化,在发生变化时立即判断是否唤醒还是等待,因此代码中不应该使用if判断,而应该使用while进行循环判断,虽然,这在一定程度上会占用一定得系统性能。改进后的代码如下:

    /**资源类**/
    class Data {
        private int number = 0;
    
        /**
         * 判断等待、业务、通知
         */
    
        //+1
        public synchronized void increment() throws InterruptedException {
            while (number != 0) {
                // 等待
                this.wait();
            }
            number++;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他线程,我+1完毕了
            this.notifyAll();
        }
        //-1
        public synchronized void decrement() throws InterruptedException {
            while (number == 0) {
                //等待
                this.wait();
            }
            number--;
            System.out.println(Thread.currentThread().getName() + "=>" + number);
            // 通知其他线程,我-1完毕了
            this.notifyAll();
        }
    }
    

    此时,开启四个线程进行测试的结果正常

    A=>1
    B=>0
    A=>1
    B=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    B=>0
    A=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    
    进程完成,退出码 0
    

    4、使用java的JUC来实现

    package com.xgp.pc;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    @SuppressWarnings("all")
    public class B {
        public static void main(String[] args) {
            Data2 data = new Data2();
    
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"A").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"B").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"C").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            },"D").start();
        }
    }
    
    /**资源类**/
    class Data2 {
        private int number = 0;
    
        /**
         * 判断等待、业务、通知
         */
        Lock lock = new ReentrantLock();
        //锁监视器(取代了对象监视器的使用)
        Condition condition = lock.newCondition();
    
        //+1
        public void increment() throws InterruptedException {
            lock.lock();
            try {
                while (number != 0) {
                    // 等待
                    condition.await();      //等待
                }
                number++;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                // 通知其他线程,我+1完毕了
                condition.signalAll();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
        //-1
        public void decrement() throws InterruptedException {
            lock.lock();
    
            try {
                while (number == 0) {
                    //等待
                    condition.await();
                }
                number--;
                System.out.println(Thread.currentThread().getName() + "=>" + number);
                // 通知其他线程,我-1完毕了
                condition.signalAll();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
    
        }
    }
    

    运行结果:

    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    A=>1
    B=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    C=>1
    D=>0
    
    进程完成,退出码 0
    

    通过该方式也同样可以实现资源的共享,并且此方式还有许多优点,能过将控制权更大程度的交给代码的编写者。相当于汽车中的手动挡,虽然缺少自动,但是会的话开的更快。

    5、使用java的JUC来实现点对点唤醒

    package com.xgp.pc;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    @SuppressWarnings("all")
    public class C {
        public static void main(String[] args) {
    
            Data3 data = new Data3();
    
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.printA();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            },"A").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.printB();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            },"B").start();
            new Thread(() -> {for(int i = 0;i < 10;i++) {
                try {
                    data.printC();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            },"C").start();
        }
    }
    
    
    class Data3 {
    
        private int number = 1;
    
        private Lock lock = new ReentrantLock();
        //锁监视器(取代了对象监视器的使用)
        private Condition condition1 = lock.newCondition();
        private Condition condition2 = lock.newCondition();
        private Condition condition3 = lock.newCondition();
    
        public void printA() {
            lock.lock();
    
            try {
                while (number != 1) {
                    condition1.await();
                }
                System.out.println(Thread.currentThread().getName());
                //唤醒指定的B
                number = 2;
                condition2.signal();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
        public void printB() {
            lock.lock();
    
            try {
                while (number != 2) {
                    condition2.await();
                }
                System.out.println(Thread.currentThread().getName());
                //唤醒指定的B
                number = 3;
                condition3.signal();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
        public void printC() {
            lock.lock();
    
            try {
                while (number != 3) {
                    condition3.await();
                }
                System.out.println(Thread.currentThread().getName());
                //唤醒指定的B
                number = 1;
                condition1.signal();
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
    

    运行结果:

    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    A
    B
    C
    
    进程完成,退出码 0
  • 相关阅读:
    Jungle Roads POJ 1251
    Light OJ 1234 Harmonic Number
    同余定理
    HDU---1052---田忌赛马
    田忌赛马---空手道俱乐部
    poj---1182---食物链
    Convenient Location(最短路之弗洛伊德)
    js动画实现透明度动画
    js动画实现侧边栏分享
    AngularJS 指令(使浏览器认识自己定义的标签)
  • 原文地址:https://www.cnblogs.com/xgp123/p/12339830.html
Copyright © 2011-2022 走看看