zoukankan      html  css  js  c++  java
  • 生产者-消费者中的缓冲区:BlockingQueue接口

    BlockingQueue接口使用场景
    相信大家对生产者-消费者模式不陌生,这个经典的多线程协作模式,最简单的描述就是生产者线程往内存缓冲区中提交任务,消费者线程从内存缓冲区里获取任务执行。在生产者-消费者模式中最重要的就是这个内存缓冲区,可能你会疑问,为什么不让生产者直接把任务提交给消费者来执行,而是要通过一个中间媒介,也就是一个缓冲区来交换任务?

    通过缓冲区,可以缓解生产者和消费者之间的速度差。假设生产者的速度大于消费者,生产者不断向缓冲区内提交任务,但是缓冲区大小有限,当内存缓冲区满时,生产者不得不被阻塞,此时消费者仍不断从缓冲区内获取任务执行,直到缓冲区不为空,生产者才能继续执行。
    通过缓冲区,生产者不需要知道消费者是谁,生产者只需把任务提交到缓冲区即可;同样消费者也不需要直到生产者是谁,获取任务通过缓冲区。这样做的好处在于,对于代码的维护和升级,如果我们要改动消费者,我们不需要修改生产者和缓冲区。生产者和消费者之间的通信通过缓冲区。
    在生产者-消费者模式中,充当这个缓冲区使用的是BlockingQueue接口,BlockingQueue继承自Queue接口,在实例化时,可以使用ArrayBlockingQueue和LinkedBlockingQueue两种队列,前者是基于数组实现的,而后者是基于链表实现,从名字我们就可以看出。看到这两个队列大家应该有点印象,在线程池中也有这么一个参数BlockingQueue:

    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
    /*
    *corePoolSize:线程池中的线程数量.
    *maximumPoolSize:线程池中的最大线程数.
    *keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间.
    *unit:keepAliveTime的单位.
    *workQueue:任务队列,被提交进线程池,但没被执行的任务.
    *threadFactory:线程工厂,用于创建线程,可自定义线程.
    *handler:拒绝服务,当线程池中没有空闲线程为新任务服务时,且等待队列中也已经满时,执行的策略.
    */
    线程池中的workQueue任务等待队列用来保存被提交进线程池,但因为没有空闲线程,所以尚未被执行的任务。使用ArrayBlockingQueue做为有界队列,LinkedBlockingQueue做为无界队列,无界队列因为基于链表实现,所以不会出现任务入队列失败的情况,直到内存耗尽为止。

    为什么使用BlockingQueue做为内存缓冲区
    用回生产者-消费者模式举例说,在多线程环境下,当生产者线程向内存缓冲区提交了一个任务后,消费者线程怎么知道此时内存缓冲区内有新的任务提交?如果我们让消费者线程不断查询缓冲区内的任务提交情况,是可以,不过这样不是一个效率高的方法。

    在线程池中也是,使用BlockingQueue队列,关键是Blocking,假设我们使用的是ArrayBlockingQueue,基于数组实现的有界队列,生产者线程不断向任务队列(也就是缓冲区)内提交任务时,当任务队列已经放满待执行任务后,生产者线程就会被阻塞,直到缓冲区内有空闲位置后,才会唤醒生产者线程。当消费者线程不断从缓冲区内获取任务执行时,假设所有任务都被获取后,消费者线程也会被阻塞,直到缓冲区内有新的任务被提交,消费者线程被重新唤醒。这是怎么做到的?使用BlockingQueue队列的线程是怎么如何在队列满时,让提交任务线程阻塞,而在队列为空,如何让获取任务线程阻塞?来看看BlockingQueue的内部实现。

    BlockingQueue内部实现
    为了实现上面所说的情况,用生产者-消费者模式为例,即:

    当缓冲队列满时,生产者线程被阻塞,无法继续向缓冲区内提交任务;消费者线程正常执行,如果消费者线程被阻塞,则将其唤醒。
    当缓冲队列为空时,消费者线程被阻塞,无法继续从缓冲区中获取任务;生产者线程正常执行,如果生产者线程被阻塞,则将其唤醒。
    BlockingQueue队列中,维护着两个Condition字段,一个为notEmpty,一个为notFull,和一把重入锁lock:

    ArrayBlockingQueue内部实现:

    final Object[] items;     
    private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull; 

    public ArrayBlockingQueue(int capacity, boolean fair) {
    if(capacity <= 0) {
    throw new IllegalArgumentException();
    }
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
    }
    在ArrayBlockingQueue队列中,使用数组实现存储,所以实例化一个Object对象数组存放元素,AtomicInteger类型的count变量是使用了无锁CAS操作的线程安全类,用来保存当前队列中的元素个数。
    LinkedBlockingQueue内部实现:

    private final int capacity; //链表的容量
    private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
    transient Node<E> head; //链表头节点
    private transient Node<E> last; //链表尾节点
    private final ReentrantLock takeLock = new ReentrantLock(); //出队列锁
    private final ReentrantLock putLock = new ReentrantLock(); //入队列锁
    private final Condition notEmpty = takeLock.newCondition();
    private final Condition notFull = putLock.newCondition();

    public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
    }
    看到源码,大家应该猜到了,就是使用Condition条件配合重入锁,让线程在某一时刻等待。而且使用加锁操作,表明BlockingQueue队列也是线程安全的。对于队列来说,两个最基本的操作:入队和出队,用ArrayBlockingQueue源码来看,ArrayBlockingQueue有offer()和put()两个方法实现往队列中插入元素,两者不同之处在于,使用offer()方法,如果此时队列中已满,那么offer()方法会插入失败,并立刻返回false;如果使用put()方法,当队列满时,使用put()方法的线程会等待,直到队列中有空闲的位置后,继续执行如对操作,这是如何做到的?来看看put()方法的实现:

    public void put(E e) throws InterruptedException {
    checkNotNull(e); //检查入队元素是否为空
    final ReentrantLock lock = this.lock; //抓住当前BlockingQueue实例的lock重入锁
    lock.lockInterruptibly(); //加锁,可以响应中断
    try {
    while(count == item.length) {
    notFull.await(); //在notFull的Condition对象上等待
    }
    insert(e); //队列入队操作
    } finally {
    lock.unlock;
    }
    }
    入队操作中,首先获得该队列的锁,然后特殊情况判断,while死循环不断判断,如果count == item.length,也就是当前队列已经满了,那么就让线程在notFull上等待,表示当前队列满,这就做到了,当内存缓冲区满时,生产者线程等待。当队列中有空闲位置了,则跳出跳出while循环,执行insert()插入操作。

    private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
    }
    在执行插入操作的实现中,会把等待在Condition实例notEmpty的线程唤醒,等于是告诉正在等待的消费者线程,当前有新任务进入缓冲区了。
    上面是入队操作,接着看出队,和入队相似,在队列中出队一个元素也有两个方法,poll()和take(),使用poll()方法出队,如果队列为空,则返回null。take()方法则会等待在这个队列上。与put()方法对比,可以直到,当队列为空时,调用take()方法的线程会等待在notEmpty上,实际上就是这样的,来看看take()方法的实现:
    public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    while(count == 0) {
    notEmpty.await();
    }
    return extract();
    } finally {
    lock.unlock();
    }
    }
    假设当前队列为空,也就是count == 0;那么就让当前线程在notEmpty上等待,直到有新的任务提交进队列,就执行入队操作extract()。
    private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeInddex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
    }
    同理往队列中入队一个元素后,会让等待在notFull上的线程唤醒,意思是告诉它们,当前队列不空了,你们可以提交新的任务进来了。
    来具体看个例子,在生产者-消费者模式中怎么用这个BlockingQueue队列:
    package producerconsumer;

    public final class work {
    private final int data;

    //构造函数初始化
    public work(int data) {
    this.data = data;
    }

    public work(String s) {
    this.data = Integer.valueOf(s);
    }

    public int getData() {
    return this.data;
    }

    @Override
    public String toString() {
    return "data = "+this.data;
    }
    }
    自定义一个work类,模拟生产者和消费者处理的任务,里面就一个int型的data变量。
    package producerconsumer;

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;

    public class ProducerDemo implements Runnable {
    private BlockingQueue<work> workQueue; //内存缓冲区队列
    private static AtomicInteger count = new AtomicInteger(); //队列中的总任务数
    private volatile boolean isRunning = true; //标识当前线程的状态

    //构造函数初始化
    public ProducerDemo(BlockingQueue<work> workQueue) {
    this.workQueue = workQueue;
    }

    public void stopProducer() {
    this.isRunning = false;
    }

    @Override
    public void run() {
    work newWork = new work(count.incrementAndGet());
    Random r = new Random();

    System.out.println("生产者线程: "+Thread.currentThread().getId()+"开始执行.");
    try {
    while(isRunning) {
    if(!workQueue.offer(newWork)) {
    System.out.println("生产者线程: "+Thread.currentThread().getId()+": 缓冲区满,任务-"+newWork+"放入失败.");
    } else {
    System.out.println("生产者线程: "+Thread.currentThread().getId()+"将任务-"+newWork+"放入缓冲区.");
    }
    Thread.sleep(r.nextInt(1000));
    }
    } catch(InterruptedException e) {
    e.printStackTrace();
    Thread.currentThread().interrupt();
    }
    }

    }
    简单地模拟,在生产者线程的构造函数中获得与消费者通信的缓冲区,然后往里面添加用随机数标记的任务。
    package producerconsumer;

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;

    public class ConsumerDemo implements Runnable {
    private BlockingQueue<work> workQueue; //内存缓冲区队列
    private volatile boolean isRunning = true; //标识当前线程的状态

    //构造函数初始化
    public ConsumerDemo(BlockingQueue<work> workQueue) {
    this.workQueue = workQueue;
    }

    public void stopConsumer() {
    this.isRunning = false;
    }

    @Override
    public void run() {
    work takeWork;
    Random r = new Random();

    System.out.println("消费者线程: "+Thread.currentThread().getId()+"开始执行.");
    while(isRunning) {
    try {
    takeWork = workQueue.take(); //从缓冲区中获取任务
    if(takeWork != null) {
    System.out.println("消费者线程: "+Thread.currentThread().getId()+"获取任务:"+takeWork.getData());
    } else {
    System.out.println("缓冲区空.");
    }
    Thread.sleep(r.nextInt(1000));
    } catch(InterruptedException e) {
    e.printStackTrace();
    Thread.currentThread().interrupt();
    }
    }
    }

    }
    消费者线程同样构造函数中获得与生产者通信的队列后,调用take()方法获取缓冲区里面的任务,并把任务id打印出来。
    package producerconsumer;

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class MainDemo {

    public static void main(String[] args) throws InterruptedException {
    //创建缓冲区
    BlockingQueue<work> workQueue = new ArrayBlockingQueue<work>(10); //基于数组实现
    //创建线程池
    ExecutorService pcThreadPool = Executors.newCachedThreadPool(); //根据实际情况调整线程数量的线程池
    //创建生产者线程
    ProducerDemo producer1 = new ProducerDemo(workQueue);
    ProducerDemo producer2 = new ProducerDemo(workQueue);
    //创建消费者线程
    ConsumerDemo consumer1 = new ConsumerDemo(workQueue);
    ConsumerDemo consumer2 = new ConsumerDemo(workQueue);
    //将线程提交到线程池
    pcThreadPool.execute(producer1);
    pcThreadPool.execute(producer2);
    pcThreadPool.execute(consumer1);
    pcThreadPool.execute(consumer2);

    Thread.sleep(3*1000);
    producer1.stopProducer();
    producer2.stopProducer();
    consumer1.stopConsumer();
    consumer2.stopConsumer();

    Thread.sleep(5*1000);
    pcThreadPool.shutdown(); //关闭线程池
    }

    }
    主函数中,创建一个ArrayBlockingQueue队列,也就是基于数组实现的BlockingQueue,然后实例化两个生产者线程和消费者线程,并将它们提交到线程池中执行,线程池使用的是newCachedThreadPool(),是一个可根据实际情况调整线程池内线程数量的线程池。
    运行结果:

     
    完整实现代码已上传GitHub:
    https://github.com/justinzengtm/Java-Multithreading/tree/master/BasicProducerConsumer
    https://gitee.com/justinzeng/multithreading/tree/master/BasicProducerConsumer
    --------------------- 

  • 相关阅读:
    udacity android 实践笔记: lesson 4 part b
    iosiPhone屏幕尺寸、分辨率及适配
    【基础练习】【区间DP】codevs2102 石子归并2(环形)题解
    Spring Boot MyBatis 连接数据库
    知乎日报 API 分析
    Dijkstra算法
    win7远程桌面连接不上,解决办法
    ECharts图表中级入门之formatter:夜谈关于ECharts图表内的数据格式化方法
    myeclipse 连接svn服务器验证位置时发生错误 404 forbidden
    极路由系列 刷机方法
  • 原文地址:https://www.cnblogs.com/hyhy904/p/10947581.html
Copyright © 2011-2022 走看看