zoukankan      html  css  js  c++  java
  • 【杂谈】再看生产-消费模式

    生产者和消费者之间为什么隔着一个队列?

    首先,生产者与消费者由于速度的不一致,所以需要一个空间用于缓冲。这可以将生产者与消费者解耦,生产者产出数据的时候,不需要把数据交到消费者手上才行,只要把数据丢入缓冲区就好。这样就可以各做各的。

    为什么缓冲区是一个队列?

    通常情况下,这个缓冲区的数据结构是一个有序的队列。实际上如果对处理顺序没啥要求,其实也不一定要用队列。插空都可以。

    为什么访问的缓冲区的时候要获得锁?

    缓冲区这个数据结构会被多线程并发访问(生产者、消费者线程),所以需要加锁,一方面保护它的结构不被破坏,另一方面保证代码的正确性。

    这样是不是就可以用了?

    是,用是可以用了,但是性能可能会比较差。

    为什么性能会比较差?

    考虑这样一个场景:缓冲区已满。生产者会一直尝试往里面丢东西,所以就一直"获得锁-释放锁-获得锁-释放锁"。一方面,生产者空转,浪费CPU时间片,就会影响其他线程的调度。这时候如果有一个消费者处理完手头的数据,想再拿一个出来处理,那这时候生产者和消费者就会进行不必要的竞争,因为这个时候生产者抢到了锁也没用。

    这,这可如何是好啊?

    简单,分两种情况,一种是当缓冲区满的时候,如果生产者再尝试往里面丢东西,就把它挂起。同理,当缓冲区为空的时候,如果消费者再尝试往里面,也把它挂起。

    那什么时候唤醒呢?

    不是一样的吗,当缓冲区来数据的时候(从无到有),就唤醒消费者线程。当缓冲区有空闲空间的时候(从满到不满),就唤醒生产者线程。

    那代码该怎么写呢?

    首先我们先简单实现一个"锁",就是下面这样。

    public class Lock {
        /**
         * 等待锁的线程队列
         */
        private List<Thread> waitThreads = new ArrayList<>();
        /**
         * 守卫
         */
        private AtomicInteger guard = new AtomicInteger(0);
        /**
         * 锁标志
         */
        private AtomicInteger lockFlag = new AtomicInteger(0);
        /**
         * 当前线程的拥有者
         */
        private Thread holder;
    
        public void lock() {
            if(Objects.equals(holder, Thread.currentThread())) //如果线程已经获得锁,则直接返回
                return;
            while(!guard.compareAndSet(0, 1)) //尝试获得守卫允许
                ;
            if(lockFlag.intValue() == 0) {
                lockFlag.set(1); //将锁标记为"已被占用"
                holder = Thread.currentThread(); //将锁的拥有者设为当前线程
                guard.set(0); //释放守卫
            } else {
                waitThreads.add(Thread.currentThread()); //加入到等待队列
                guard.set(0); //释放守卫
                LockSupport.park(); //将当前线程挂起
                holder = Thread.currentThread(); //当线程从上一行恢复执行的时候,就说明此线程获得了锁
            }
        }
    
        public void unlock() {
            if(!Objects.equals(holder, Thread.currentThread())) //如果不是锁的拥有者就没资格释放锁
                return;
            while(!guard.compareAndSet(0, 1))
                ;
            if(waitThreads.size() == 0) { //判断是否有线程正在等待
                lockFlag.set(0); //如果没有,就将锁标记为"空闲"
                holder = null;
                guard.set(0); //释放守卫
            } else {
                LockSupport.unpark(waitThreads.remove(0)); //如果有线程在等待,则唤醒队列中的第一个
                guard.set(0); //释放守卫
            }
        }
    }

    然后,我们再来实现一下缓冲区的类。

    public class BufferCache {
        /**
         * 缓冲数组,用于保存数据
         */
        private Object[] data;
        /**
         * 读索引 => 下一个要消费的数据从哪里拿
         */
        private int readIndex;
        /**
         * 写索引 => 下一个进来的数据要放哪里
         */
        private int writeIndex;
        /**
         * 当前缓冲区内数据的数量
         */
        private int count;
        /**
         * 生产者线程等待队列
         */
        private List<Thread> waitProducers = new ArrayList<>();
        /**
         * 消费者线程等待队列
         */
        private List<Thread>  waitConsumers = new ArrayList<>();
        /**
         * 前面实现的锁
         */
        private Lock lock = new Lock();
    
        public BufferCache(int initial) {
            this.data = new Object[initial];
        }
    
        public void put(Object e) {
            lock.lock(); //获得锁
            while(count == data.length) { //如果已满,就将生产线程挂起
                waitProducers.add(Thread.currentThread()); //放入等待队列,这样要唤醒的时候可以找得到
                lock.unlock(); //释放锁
                LockSupport.park(); //挂起当前线程
                lock.lock(); //醒来的时候,要再获得锁
    
            }
    
            data[writeIndex] = e;  //存入数据
            count++;
            if(++writeIndex == data.length) { //循环利用存储空间
                writeIndex = 0;
            }
            while(waitConsumers.size() != 0) { //唤醒消费线程
                LockSupport.unpark(waitConsumers.remove(0));
            }
            lock.unlock(); //释放锁
        }
    
        public Object take() { //同理
            lock.lock();
            Object e = null;
            while (count == 0) {
                waitConsumers.add(Thread.currentThread());
                lock.unlock();
                LockSupport.park();
                lock.lock();
            }
    
            e = data[readIndex];
            count--;
            if(++readIndex == data.length) {
                readIndex = 0;
            }
            while(waitProducers.size() != 0) {
                LockSupport.unpark(waitProducers.remove(0));
            }
            lock.unlock();
            return e;
        }
    
        private static class Task1 implements Runnable { //生产任务
            private int num;
            private BufferCache cache;
            private String name;
    
            public Task1(BufferCache cache, String index) {
                this.name = "producer-" + index;
                this.num = 0;
                this.cache = cache;
            }
    
            @Override
            public void run() {
                String data;
                while(true) { //每隔一秒往队列丢一个数据
                    data = num + " from " + name;
                    cache.put(data);
                    System.out.println(name + "放入:" + data);
                    num++;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
        private static class Task2 implements Runnable {
            private BufferCache cache;
    
            public Task2( BufferCache cache) {
                this.cache = cache;
            }
    
            @Override
            public void run() {
                while(true) { //不断从队列中抓取数据
                    Object e = cache.take();
                    System.out.println("消费到:" + e);
                }
            }
        }
        public static void main(String[] args) { //跑个案例
            BufferCache cache = new BufferCache(20);
            Thread producer;
            Thread consumer;
            for(int i = 0; i < 5; i++) { //开5个生产者
                producer = new Thread(new Task1(cache, i + ""));
                producer.start();
            }
    
            for(int i = 0; i < 3; i++) { //开3个消费者
                consumer = new Thread(new Task2(cache));
                consumer.start();
            }
        }
    }

    为什么条件判断要用while循环,if不行吗?

    假如唤醒的是生产者线程A,可能它恢复执行的时候,缓冲区已经被生产者线程B再次填满了,所以它需要再判断一次。

    为什么线程恢复的时候还要再获得锁?

    获得锁是为了在判断和执行期间,条件不会发生变化。这样代码执行起来才是正确的。再详细点就是,当生产者线程A获得锁的时候,其他生产者线程不能改变缓冲区的状态(因为其他生产者线程如果要改变的话,也要先获得锁),在线程A获得锁到释放锁期间,它看到的状态是不会发生变化的。

    这两个等待队列好像跟条件变量很像,这跟条件变量有什么关系?

    其实这就是条件变量,条件变量的本质就是一个队列,当条件不满足的时候,就把线程放入这个队列;当条件满足的时候,可以唤醒一个或多个线程,让他们继续执行。

    你可以参考JDK中,BlockingQueue的一个实现类ArrayBlockingQueue,看看是不是跟上面的代码很像。

    锁和条件变量的关系

    一方面,由于条件变量是一个队列,当多线程访问的时候,肯定要保证它的线程安全,所以它一般都会跟一个锁对象有关联。要访问这个队列,必须先获得锁。

    另一方面,进行条件判断的时候也离不开锁(保证在判断和执行期间,条件不会发生变化)

    所以,条件变量和锁是绑在一块的,或者说条件变量离不开锁。这样看来,JDK中,Condition对象由Lock对象生成就很容易理解了。

    Condition x = lock.newCondition();
  • 相关阅读:
    编译pypcap
    python输出重复字符串的简单办法
    Python天天美味(1) 交换变量(转)
    Python天天美味(4) isinstance判断对象类型(转)
    Python天天美味(2) 字符遍历的艺术(转)
    Python天天美味(3) 字符转换(转)
    Python天天美味(5) ljust rjust center(转)
    Python天天美味(6) strip lstrip rstrip(转)
    Python天天美味(10) 除法小技巧(转)
    Python标准库12 数学与随机数 (math包,random包)(转)
  • 原文地址:https://www.cnblogs.com/longfurcat/p/11494441.html
Copyright © 2011-2022 走看看