zoukankan      html  css  js  c++  java
  • 线程安全的生产者消费者四种实现方法

    问题描述

    在IT技术面试过程中,我们经常会遇到生产者消费者问题(Producer-consumer problem), 这是多线程并发协作问题的经典案例。场景中包含三个对象,生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者的主要作用是不断生成数据放到缓冲区,消费者则从缓冲区不断消耗数据。该问题的关键是如何线程安全的操作共享数据块,保证生产者线程和消费者线程可以正确的更新数据块,主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当停止在缓冲区时消耗数据. 3. 在同一时间应当只允许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操作访问共享区块的要求)。

    解决方案

    解决问题以上问题通常有信号量,wait & notify, 管道或者阻塞队列等几种思路。本文以Java语言为例一一进行举例讲解。

    信号量

    信号量(Semaphore)也称信号灯,是用来控制资源被同时访问的个数,比如控制访问数据库最大连接数的数量,线程通过acquire()获得连接许可,完成数据操作后,通过release()释放许可。对于生产者消费者问题来说,为了满足线程安全操作的要求,同一时间我们只允许一个线程访问共享数据区,因此需要一个大小为1的信号量mutex来控制互斥操作。注意到我们还定义了notFull 和 notEmpty 信号量,notFull用于标识当前可用区块的空间大小,当notFull size 大于0时表明"not full", producer 可以继续生产,等于0时表示空间已满,无法继续生产;同样,对于notEmpty信号量来说,大于0时表明 "not empty", consumer可以继续消耗,等于0 时表明没有产品,无法继续消耗。notFull初始size 为5 (5个available空间可供生产),notEmpty初始为0(没有产品可供消耗)。

       /*** 
         数据仓储class,所有的producer和consumer共享这个class对象
       **/
        static class DataWareHouse {
           //共享数据区
            private final Queue<String> data = new LinkedList();
            //非满锁
            private final Semaphore notFull;
            //非空锁
            private final Semaphore notEmpty;
            //互斥锁
            private final Semaphore mutex;
    
            public DataWareHouse(int capacity) {
                this.notFull = new Semaphore(capacity);
                this.notEmpty = new Semaphore(0);
                mutex = new Semaphore(1);
            }
            public void offer(String x) throws InterruptedException {
                notFull.acquire(); //producer获取信号,notFull信号量减一
                mutex.acquire(); //当前进程获得信号,mutex信号量减1,其他线程被阻塞操作共享区块data
                data.add(x);
                mutex.release(); //mutex信号量+1, 其他线程可以继续信号操作共享区块data
                notEmpty.release(); //成功生产数据,notEmpty信号量加1
            }
            public String poll() throws InterruptedException {
                notEmpty.acquire(); //notEmpty信号减一
                mutex.acquire();
                String result = data.poll();
                mutex.release();
                notFull.release(); //成功消耗数据, notFull信号量加1
                return result;
            }
        }
       /**Producer线程**/
        static class Producer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            public Producer(final DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(100); //生产的速度慢于消耗的速率
                        String s = UUID.randomUUID().toString();
                        System.out.println("put  data " + s);
                        dataWareHouse.offer(s);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
       /**Consumer线程**/
        static class Consumer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            public Consumer(final DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                while (true) {
                    while (true) {
                        try {
                            System.out.println("get data " + dataWareHouse.poll());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        //测试代码
        public static void main(String[] args) {
            final DataWareHouse dataWareHouse = new DataWareHouse(5);
            //三个producer 持续生产
            for (int i = 0; i < 3; i++) {
                Thread t = new Thread(new Producer(dataWareHouse));
                t.start();
            }
            //三个consumer 持续消耗
            for (int i = 0; i < 3; i++) {
                Thread t = new Thread(new Consumer(dataWareHouse));
                t.start();
            }
        }
    

    Wait 和 Notify 机制

    Java Object对象类中包含三个final methods来允许线程之间进行通信,告知资源的状态。它们分别是wait(), notify(), 和notifyAll()。

    wait(): 顾名思义告诉当前线程释放锁,陷入休眠状态(waiting状态),等待资源。wait 方法本身是一个native method,它在Java中的使用语法如下所示:

    synchronized(lockObject )
    { 
        while( ! condition )
        { 
            lockObject.wait();
        }
        //take the action here;
    }
    

    notify(): 用于唤醒waiting状态的线程, 同时释放锁,被唤醒的线程可以重新获得锁访问资源。它的基本语法 如下

    synchronized(lockObject) 
    {
        //establish_the_condition;
        lockObject.notify();
        //any additional code if needed
    }
    

    notifyAll(): 不同于notify(),它用于唤醒所有处于waiting状态的线程。语法如下:

    synchronized(lockObject) 
    {
        establish_the_condition;
        lockObject.notifyAll();
    }
    

    说完了这三个方法,来看下如何使用wait & notify(All) 来解决我们的问题。新的DataWareHouse 类如下所示:

        //producer类和consumer共享对象
        static class DataWareHouse {
            //共享数据区
            private final Queue<String> data = new LinkedList();
            private int capacity;
            private int size = 0;
    
            public DataWareHouse(int capacity) {
                this.capacity = capacity;
            }
    
            public synchronized void offer(String x) throws InterruptedException {
                while (size == capacity) { //当buffer满时,producer进入waiting 状态
                    this.wait(); //使用this对象来加锁
                }
                data.add(x);
                size++;
                notifyAll(); //当buffer 有数据时,唤醒所有等待的consumer线程
            }
    
            public synchronized String poll() throws InterruptedException {
                while (size == 0) {//当buffer为空时,consumer 进入等待状态
                    this.wait();
                }
                String result = data.poll();
                size--;   
                notifyAll(); //当数据被消耗,空间被释放,通知所有等待的producer。
                return result;
            }
        }
    

    Note: 在方法上使用synchronized 等价于在方法体内使用synchronized(this),两者都是使用this对象作为锁。

    生产者和消费者类,以及测试代码和 信号量 section 相同,不做重复列举了。

    管道

    管道Pipe是实现进程或者线程(线程之间通常通过共享内存实现通讯,而进程则通过scoket,管道,消息队列等技术)之间通信常用方式,它连接输入流和输出流,基于生产者- 消费者模式构建的一种技术。具体实现可以通过创建一个管道输入流对象和管道输出流对象,然后将输入流和输出流就行链接,生产者通过往管道中写入数据,而消费者在管道数据流中读取数据,通过这种方式就实现了线程之间的互相通讯。

    具体实现代码如下所示

    public class PipeSolution {
        static class DataWareHouse implements Closeable {
            private final PipedInputStream pis;
            private final PipedOutputStream pos;
    
            public DataWareHouse() throws IOException {
                pis = new PipedInputStream();
                pos = new PipedOutputStream();
                pis.connect(pos); //连接管道
            }
            //向管道中写入数据
            public void offer(int val) throws IOException {
                pos.write(val);
                pos.flush();
            }
            //从管道中取数据.
            public int poll() throws IOException {
                 //当管道中没有数据,方法阻塞
                return pis.read();
            }
            //关闭管道
            @Override
            public void close() throws IOException {
                if (pis != null) {   
                    pis.close();
                }
                if (pos != null) {
                    pos.close();
                }
            }
        }
        //consumer类
        static class Consumer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            Consumer(DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                try {
                    //消费者不断从管道中读取数据
                    while (true) {
                        int num = dataWareHouse.poll();
                        System.out.println("get data +" + num);
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        static class Producer implements Runnable {
            private final DataWareHouse dataWareHouse;
            private final Random random = new Random();
    
            Producer(DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                try {
                    //生产者不断向管道中写入数据
                    while (true) {
                        int num = random.nextInt(256);
                        dataWareHouse.offer(num);
                        System.out.println("put data +" + num);
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
    
            public static void main(String[] args) throws IOException {
                DataWareHouse dataWareHouse = new DataWareHouse();
                new Thread(new Producer(dataWareHouse)).start();
                new Thread(new Consumer(dataWareHouse)).start();
            }
        }
    

    阻塞队列

    阻塞队列(BlockingQueue),具有1. 当队列满了的时候阻塞入队列操作 2. 当队列空了的时候阻塞出队列操作 3. 线程安全 的特性,因而阻塞队列通常被视为实现生产消费者模式最便捷的工具,其中DataWareHouse类实现代码如下:

      static class DataWareHouse {
            //共享数据区
            private final BlockingQueue<String> blockingQueue;
            
            public DataWareHouse(int capacity) {
                this.blockingQueue = new ArrayBlockingQueue<>(capacity);
            }
    
            public void offer(String x) {
                blockingQueue.put(x);
            }
            public String poll() {
                return blockingQueue.take();
            }
        }
    

    生产者和消费者类,以及测试代码和 信号量 section 相同,在此不做重复列举了。

    总结

    生产者消费者问题是面试中经常会遇到的题目,本文总结了几种常见的实现方式,面试过程中通常不必要向面试官描述过多实现细节,说出每种实现方式的特点即可。希望能给大家带来帮助。

    Reference

    1. https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/
  • 相关阅读:
    delphi point数据类型
    Sql Server 2008 R2链接服务器Oracle数据库
    ORA-28000 账号被锁定的解决办法
    [Oracle] sqlplus / as sysdba ora-01031 insufficient privileges
    Oracle的操作系统认证(/ as sydba 登录方式)
    Delphi使用线程TThread查询数据库
    oracle
    统计字符串中字符出现的次数-Python
    Jmeter保存下载的文件
    如何在Microsoft Store上免费获得 HEIF、HEVC 编码支持
  • 原文地址:https://www.cnblogs.com/jun-ma/p/11843394.html
Copyright © 2011-2022 走看看