zoukankan      html  css  js  c++  java
  • 线程安全的生产者消费者四种实现方法

    问题描述

    在IT技术面试过程中,我们经常会遇到生产者消费者问题(Producer-consumer problem), 这是多线程并发协作问题的经典案例。场景中包含三个对象,生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者的主要作用是不断生成数据放到缓冲区,消费者则从缓冲区不断消耗数据。该问题的关键是如何线程安全的操作共享数据块,保证生产者线程和消费者线程可以正确的更新数据块,主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当停止在缓冲区时消耗数据. 3. 在同一时间应当只允许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操作访问共享区块的要求)。

    解决方案

    解决问题以上问题通常有信号量,wait & notify, 管道或者阻塞队列等几种思路。本文以Java语言为例一一进行举例讲解。

    信号量

    信号量(Semaphore)也称信号灯,是用来控制资源被同时访问的个数,比如控制访问数据库最大连接数的数量,线程通过acquire()获得连接许可,完成数据操作后,通过release()释放许可。对于生产者消费者问题来说,为了满足线程安全操作的要求,同一时间我们只允许一个线程访问共享数据区,因此需要一个大小为1的信号量mutex来控制互斥操作。注意到我们还定义了notFull 和 notEmpty 信号量,notFull用于标识当前可用区块的空间大小,当notFull size 大于0时表明"not full", producer 可以继续生产,等于0时表示空间已满,无法继续生产;同样,对于notEmpty信号量来说,大于0时表明 "not empty", consumer可以继续消耗,等于0 时表明没有产品,无法继续消耗。notFull初始size 为5 (5个available空间可供生产),notEmpty初始为0(没有产品可供消耗)。

       /*** 
         数据仓储class,所有的producer和consumer共享这个class对象
       **/
        static class DataWareHouse {
           //共享数据区
            private final Queue<String> data = new LinkedList();
            //非满锁
            private final Semaphore notFull;
            //非空锁
            private final Semaphore notEmpty;
            //互斥锁
            private final Semaphore mutex;
    
            public DataWareHouse(int capacity) {
                this.notFull = new Semaphore(capacity);
                this.notEmpty = new Semaphore(0);
                mutex = new Semaphore(1);
            }
            public void offer(String x) throws InterruptedException {
                notFull.acquire(); //producer获取信号,notFull信号量减一
                mutex.acquire(); //当前进程获得信号,mutex信号量减1,其他线程被阻塞操作共享区块data
                data.add(x);
                mutex.release(); //mutex信号量+1, 其他线程可以继续信号操作共享区块data
                notEmpty.release(); //成功生产数据,notEmpty信号量加1
            }
            public String poll() throws InterruptedException {
                notEmpty.acquire(); //notEmpty信号减一
                mutex.acquire();
                String result = data.poll();
                mutex.release();
                notFull.release(); //成功消耗数据, notFull信号量加1
                return result;
            }
        }
       /**Producer线程**/
        static class Producer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            public Producer(final DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.sleep(100); //生产的速度慢于消耗的速率
                        String s = UUID.randomUUID().toString();
                        System.out.println("put  data " + s);
                        dataWareHouse.offer(s);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
       /**Consumer线程**/
        static class Consumer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            public Consumer(final DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                while (true) {
                    while (true) {
                        try {
                            System.out.println("get data " + dataWareHouse.poll());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
        //测试代码
        public static void main(String[] args) {
            final DataWareHouse dataWareHouse = new DataWareHouse(5);
            //三个producer 持续生产
            for (int i = 0; i < 3; i++) {
                Thread t = new Thread(new Producer(dataWareHouse));
                t.start();
            }
            //三个consumer 持续消耗
            for (int i = 0; i < 3; i++) {
                Thread t = new Thread(new Consumer(dataWareHouse));
                t.start();
            }
        }
    

    Wait 和 Notify 机制

    Java Object对象类中包含三个final methods来允许线程之间进行通信,告知资源的状态。它们分别是wait(), notify(), 和notifyAll()。

    wait(): 顾名思义告诉当前线程释放锁,陷入休眠状态(waiting状态),等待资源。wait 方法本身是一个native method,它在Java中的使用语法如下所示:

    synchronized(lockObject )
    { 
        while( ! condition )
        { 
            lockObject.wait();
        }
        //take the action here;
    }
    

    notify(): 用于唤醒waiting状态的线程, 同时释放锁,被唤醒的线程可以重新获得锁访问资源。它的基本语法 如下

    synchronized(lockObject) 
    {
        //establish_the_condition;
        lockObject.notify();
        //any additional code if needed
    }
    

    notifyAll(): 不同于notify(),它用于唤醒所有处于waiting状态的线程。语法如下:

    synchronized(lockObject) 
    {
        establish_the_condition;
        lockObject.notifyAll();
    }
    

    说完了这三个方法,来看下如何使用wait & notify(All) 来解决我们的问题。新的DataWareHouse 类如下所示:

        //producer类和consumer共享对象
        static class DataWareHouse {
            //共享数据区
            private final Queue<String> data = new LinkedList();
            private int capacity;
            private int size = 0;
    
            public DataWareHouse(int capacity) {
                this.capacity = capacity;
            }
    
            public synchronized void offer(String x) throws InterruptedException {
                while (size == capacity) { //当buffer满时,producer进入waiting 状态
                    this.wait(); //使用this对象来加锁
                }
                data.add(x);
                size++;
                notifyAll(); //当buffer 有数据时,唤醒所有等待的consumer线程
            }
    
            public synchronized String poll() throws InterruptedException {
                while (size == 0) {//当buffer为空时,consumer 进入等待状态
                    this.wait();
                }
                String result = data.poll();
                size--;   
                notifyAll(); //当数据被消耗,空间被释放,通知所有等待的producer。
                return result;
            }
        }
    

    Note: 在方法上使用synchronized 等价于在方法体内使用synchronized(this),两者都是使用this对象作为锁。

    生产者和消费者类,以及测试代码和 信号量 section 相同,不做重复列举了。

    管道

    管道Pipe是实现进程或者线程(线程之间通常通过共享内存实现通讯,而进程则通过scoket,管道,消息队列等技术)之间通信常用方式,它连接输入流和输出流,基于生产者- 消费者模式构建的一种技术。具体实现可以通过创建一个管道输入流对象和管道输出流对象,然后将输入流和输出流就行链接,生产者通过往管道中写入数据,而消费者在管道数据流中读取数据,通过这种方式就实现了线程之间的互相通讯。

    具体实现代码如下所示

    public class PipeSolution {
        static class DataWareHouse implements Closeable {
            private final PipedInputStream pis;
            private final PipedOutputStream pos;
    
            public DataWareHouse() throws IOException {
                pis = new PipedInputStream();
                pos = new PipedOutputStream();
                pis.connect(pos); //连接管道
            }
            //向管道中写入数据
            public void offer(int val) throws IOException {
                pos.write(val);
                pos.flush();
            }
            //从管道中取数据.
            public int poll() throws IOException {
                 //当管道中没有数据,方法阻塞
                return pis.read();
            }
            //关闭管道
            @Override
            public void close() throws IOException {
                if (pis != null) {   
                    pis.close();
                }
                if (pos != null) {
                    pos.close();
                }
            }
        }
        //consumer类
        static class Consumer implements Runnable {
            private final DataWareHouse dataWareHouse;
    
            Consumer(DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                try {
                    //消费者不断从管道中读取数据
                    while (true) {
                        int num = dataWareHouse.poll();
                        System.out.println("get data +" + num);
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        static class Producer implements Runnable {
            private final DataWareHouse dataWareHouse;
            private final Random random = new Random();
    
            Producer(DataWareHouse dataWareHouse) {
                this.dataWareHouse = dataWareHouse;
            }
    
            @Override
            public void run() {
                try {
                    //生产者不断向管道中写入数据
                    while (true) {
                        int num = random.nextInt(256);
                        dataWareHouse.offer(num);
                        System.out.println("put data +" + num);
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
    
            public static void main(String[] args) throws IOException {
                DataWareHouse dataWareHouse = new DataWareHouse();
                new Thread(new Producer(dataWareHouse)).start();
                new Thread(new Consumer(dataWareHouse)).start();
            }
        }
    

    阻塞队列

    阻塞队列(BlockingQueue),具有1. 当队列满了的时候阻塞入队列操作 2. 当队列空了的时候阻塞出队列操作 3. 线程安全 的特性,因而阻塞队列通常被视为实现生产消费者模式最便捷的工具,其中DataWareHouse类实现代码如下:

      static class DataWareHouse {
            //共享数据区
            private final BlockingQueue<String> blockingQueue;
            
            public DataWareHouse(int capacity) {
                this.blockingQueue = new ArrayBlockingQueue<>(capacity);
            }
    
            public void offer(String x) {
                blockingQueue.put(x);
            }
            public String poll() {
                return blockingQueue.take();
            }
        }
    

    生产者和消费者类,以及测试代码和 信号量 section 相同,在此不做重复列举了。

    总结

    生产者消费者问题是面试中经常会遇到的题目,本文总结了几种常见的实现方式,面试过程中通常不必要向面试官描述过多实现细节,说出每种实现方式的特点即可。希望能给大家带来帮助。

    Reference

    1. https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/
  • 相关阅读:
    DataAnnotations
    使用BizTalk实现RosettaNet B2B So Easy
    biztalk rosettanet 自定义 pip code
    Debatching(Splitting) XML Message in Orchestration using DefaultPipeline
    Modifying namespace in XML document programmatically
    IIS各个版本中你需要知道的那些事儿
    关于IHttpModule的相关知识总结
    开发设计的一些思想总结
    《ASP.NET SignalR系列》第五课 在MVC中使用SignalR
    《ASP.NET SignalR系列》第四课 SignalR自托管(不用IIS)
  • 原文地址:https://www.cnblogs.com/jun-ma/p/11843394.html
Copyright © 2011-2022 走看看