zoukankan      html  css  js  c++  java
  • JUC中的阻塞队列

    阻塞队列的应用场景
    阻塞队列这块的应用场景,比较多的仍然是对于生产者消 费者场景的应用,但是由于分布式架构的普及,是的大家 更多的关注在分布式消息队列上。所以其实如果把阻塞队 列比作成分布式消息队列的话,那么所谓的生产者和消费 者其实就是基于阻塞队列的解耦。 另外,阻塞队列是一个 fifo 的队列,所以对于希望在线程 级别需要实现对目标服务的顺序访问的场景中,也可以使 用;

    在Java8中,提供了7个阻塞队列:

    阻塞队列的操作方法
    在阻塞队列中,提供了四种处理方式

    1. 插入操作

    add(e)  :添加元素到队列中,如果队列满了,继续插入 元素会报错,IllegalStateException。

    offer(e)  : 添加元素到队列,同时会返回元素是否插入 成功的状态,如果成功则返回true
    put(e)   :当阻塞队列满了以后,生产者继续通过 put 添加元素,队列会一直阻塞生产者线程,知道队列可用

    offer(e,time,unit) :当阻塞队列满了以后继续添加元素, 生产者线程会被阻塞指定时间,如果超时,则线程直接 退出

    2. 移除操作

    remove():当队列为空时,调用 remove 会返回 false, 如果元素移除成功,则返回true

    poll(): 当队列中存在元素,则从队列中取出一个元素, 如果队列为空,则直接返回null

    take():基于阻塞的方式获取队列中的元素,如果队列为 空,则take方法会一直阻塞,直到队列中有新的数据可 以消费

    poll(time,unit):带超时机制的获取数据,如果队列为空, 则会等待指定的时间再去获取元素返回

     ArrayBlockingQueue 原理分析
    构造方法
    ArrayBlockingQueue提供了三个构造方法,分别如下。 capacity: 表示数组的长度,也就是队列的长度
    fair:表示是否为公平的阻塞队列,默认情况下构造的是非 公平的阻塞队列。 其中第三个构造方法就不解释了,它提供了接收一个几个 作为数据初始化的方法;

    public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);//重入锁,入队和入队持有同一把锁
    notEmpty = lock.newCondition();//初始化非空等待队列
    notFull = lock.newCondition();//初始化非满等待队列
    }

    Add 方法
    以add方法作为入口,在add方法中会调用父类的add方 法,也就是 AbstractQueue.如果看源码看得比较多的话,
    一般这种写法都是调用父类的模版方法来解决通用性问题

    AbstractQueue.add
    //从父类的add方法可以看到,判断队列是否满了,如果队列满了,则直接抛出一个异常

    public boolean add(E e) {
    if (offer(e))
    return true;
    else
    throw new IllegalStateException("Queue full");
    }
    offer 方法
    add方法最终还是调用offer方法来添加数据,返回一个添加成功或者失败的布尔值反馈 这段代码做了几个事情
    1. 判断添加的数据是否为空
    2. 添加重入锁
    3. 判断队列长度,如果队列长度等于数组长度,表示满了 直接返回false 
    4. 否则,直接调用enqueue将元素添加到队列中

    public boolean offer(E e) {
    checkNotNull(e);//对请求数据做判断,若e为null,则抛出异常
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    if (count == items.length)//判断队列是否满了
    return false;
    else {
    enqueue(e);//加入队列
    return true;
    }
    } finally {
    lock.unlock();
    }
    }

    enqueue :真正的数据添加操作
    private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;//通过putIndex对数据赋值
    if (++putIndex == items.length)//当putIndex等于数组长度时,将putIndex重置为0
    putIndex = 0;
    count++;//记录队列元素的个数
    notEmpty.signal();//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素

    }
    putIndex为什么会在等于数 组长度的时候重新设置为0?
    因为ArrayBlockingQueue是一个FIFO的队列,队列添加 元素时,是从队尾获取putIndex来存储元素,当putIndex 等于数组长度时,
    下次就需要从数组头部开始添加了。

    下面这个图模拟了添加到不同长度的元素时,putIndex的 变化,当putIndex等于数组长度时,不可能让putIndex继 续累加,
    否则会超出数组初始化的容量大小。同时大家还 需要思考两个问题 :
    1. 当元素满了以后是无法继续添加的,因为会报错
    2. 其次,队列中的元素肯定会有一个消费者线程通过take 或者其他方法来获取数据,而获取数据的同时元素也会 从队列中移除

     put方法

    put 方法和 add 方法功能一样,差异是 put 方法如果队列 满了,会阻塞。

    public void put(E e) throws InterruptedException {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//这个也是获得锁,但
    //是和 lock 的区别是,这个方法优先允许在等待时由其他线程调
    //用等待线程的 interrupt 方法来中断等待直接返回。而 lock
    //方法是尝试获得锁成功后才响应中断
      try {
        while (count == items.length)
        notFull.await();//队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中,reentrantlock的实现原理
        enqueue(e);
      } finally {
        lock.unlock();
      }
    }

     take 方法
    take方法是一种阻塞获取队列中元素的方法 它的实现原理很简单,有就删除没有就阻塞,注意这个阻 塞是可以中断的,如果队列没有数据那么就加入notEmpty 条件队列等待(有数据就直接取走,方法结束),如果有新的 put线程添加了数据,那么put操作将会唤醒take线程, 执行take操作。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //如果队列为空的情况
    下,直接通过 await 方法阻塞
            return dequeue();
        } finally {
            lock.unlock();
        } }

     如果队列中添加了元素,那么这个时候,会在enqueue中 调用notempty.signal唤醒take线程来获得元素;

     dequeue 方法
    这个是出队列的方法,主要是删除队列头部的元素并发返 回给客户端
    takeIndex,是用来记录拿数据的索引值

    private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];//默认获取0位置的元素
    items[takeIndex] = null;//将该位置的元素设置为空
    if (++takeIndex == items.length)//这里的作用是,如果拿到数组的最大值,那么重置为0,继续从头部位置开始获取数据
    takeIndex = 0;
    count--;//记录元素个数递减
    if (itrs != null)
    itrs.elementDequeued();//同时更新迭代器中的元素个数
    notFull.signal();//触发 因为队列满了以后导致的被阻塞的线程
    return x;
    }

    remove方法
    remove方法是移除一个指定元素:
    public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    if (count > 0) {//如果队列不为空
    final int putIndex = this.putIndex;//获取下一个要添加元素时的索引
    int i = takeIndex;//获取当前要被移除的元素索引
    do {
    if (o.equals(items[i])) {//从takeIndex下标开始,找到要被删除的元素
    removeAt(i);//移除指定元素
    return true;
    }
              //当前删除索引执行加1后判断是否与数组长度相等
              //若为true,说明索引已到数组尽头,将i设置为0

    if (++i == items.length)
    i = 0;
    } while (i != putIndex);//继续查找,直到找到最后一个元素
    }
    return false;
    } finally {
    lock.unlock();
    }
    }
     
     
     
  • 相关阅读:
    一行代码更改博客园皮肤
    fatal: refusing to merge unrelated histories
    使用 netcat 传输大文件
    linux 命令后台运行
    .net core 使用 Nlog 配置文件
    .net core 使用 Nlog 集成 exceptionless 配置文件
    Mysql不同字符串格式的连表查询
    Mongodb between 时间范围
    VS Code 使用 Debugger for Chrome 调试vue
    css权重说明
  • 原文地址:https://www.cnblogs.com/zpp1234/p/13203472.html
Copyright © 2011-2022 走看看