zoukankan      html  css  js  c++  java
  • java多线程-BlockingQueue

    • BlockingQueue简介

      ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

      LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE,每次插入后都将动态地创建链接节点。

      PriorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素,依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

      DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

      SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。

    • BlockingQueue内容

      BlockingQueue主要方法:

      抛出异常 特殊值 阻塞 超时
    插入 add(e) offer(e) put(e) offer(e, time, unit)
    移除 remove() poll() take() poll(time, unit)
    检查 element() peek() 不可用 不可用

      对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。因为使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法却不能达到这样的效果。注意,非阻塞队列中的方法都没有进行同步措施。

    • BlockingQueue实现原理

      以ArrayBlockingQueue为例,查看其源代码,其中主要包含以下对象:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        private static final long serialVersionUID = -817911632652898426L;
    
        /** 数组对象,用于放置对象 */
        final Object[] items;
    
        /** put, offer, or add方法放入数组的索引 */
        int putIndex;
    
        /**  take, poll, peek or remove方法取出数据的数组索引 */
        int takeIndex;
        /** queue队列的总数 */
        int count;
    
        /**可重入锁,控制并发*/
        final ReentrantLock lock;
        /** 非空信号量,可以取数*/
        private final Condition notEmpty;
        /** 非满信号量,可以放数 */
        private final Condition notFull;
    }

      下面主要介绍下put()和take()方法,来观察其同步的实现:

     1 public void put(E e) throws InterruptedException {
     2         checkNotNull(e);
     3         final ReentrantLock lock = this.lock;
     4         lock.lockInterruptibly();
     5         try {
     6             while (count == items.length)
     7                 notFull.await();
     8             insert(e);
     9         } finally {
    10             lock.unlock();
    11         }
    12 }
     1 public E take() throws InterruptedException {
     2         final ReentrantLock lock = this.lock;
     3         lock.lockInterruptibly();
     4         try {
     5             while (count == 0)
     6                 notEmpty.await();
     7             return extract();
     8         } finally {
     9             lock.unlock();
    10         }
    11     }

      大家应该明白了阻塞队列的实现原理,事实它和我们用Object.wait()、Object.notify()和非阻塞队列实现生产者-消费者的思路类似,只不过它把这些工作一起集成到了阻塞队列中实现。并且在前面Condition中我们也模拟实现了一个阻塞队列,实现与其大同小异。

    • BlockingQueue应用

      1:启动两个线程实现互斥等待:

     1 public class BlockingQueueTest {
     2     public static void main(String[] args) {
     3         final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
     4         for (int i = 0; i < 2; i++) {
     5             new Thread(new Runnable() {
     6                 @Override
     7                 public void run() {
     8                     while (true) {
     9                         System.out.println("Thread "+Thread.currentThread().getName()+"正在准备放入数据");
    10                         try {
    11                             //模拟线程的放数速度
    12                             Thread.sleep(new Random().nextInt(1000));
    13                         } catch (InterruptedException e) {
    14                             // TODO Auto-generated catch block
    15                             e.printStackTrace();
    16                         }
    17                         try {
    18                             queue.put(1);
    19                         } catch (InterruptedException e) {
    20                             // TODO Auto-generated catch block
    21                             e.printStackTrace();
    22                         }
    23                         System.out.println("Thread "+Thread.currentThread().getName()+"放入数据,此时队列中的数据为:"+queue.size());
    24                     }
    25                 }
    26             }).start();
    27             new Thread(new Runnable() {
    28                 @Override
    29                 public void run() {
    30                     while (true) {
    31                         System.out.println("Thread "+Thread.currentThread().getName()+"正在取得数据");
    32                         try {
    33                             //模拟线程的去数速度
    34                             Thread.sleep(100);
    35                         } catch (InterruptedException e) {
    36                             // TODO Auto-generated catch block
    37                             e.printStackTrace();
    38                         }
    39                         try {
    40                             queue.take();
    41                         } catch (InterruptedException e) {
    42                             // TODO Auto-generated catch block
    43                             e.printStackTrace();
    44                         }
    45                         System.out.println("Thread "+Thread.currentThread().getName()+"取得数据,此时队列中的数据为:"+queue.size());
    46                     }
    47                 }
    48             }).start();
    49         }
    50         
    51     }
    52 }

      2:前面介绍传统线程通信中,主线程和子线程交替运行,现在以阻塞队列来实现。

     1 public class BlockingQueueCommunication {
     2     public static void main(String[] args) {
     3         final Business business = new Business();
     4         new Thread(new Runnable() {
     5             
     6             @Override
     7             public void run() {
     8                 // TODO Auto-generated method stub
     9                 for (int i = 0; i < 50; i++) {
    10                     try {
    11                         business.sub(i);
    12                     } catch (InterruptedException e) {
    13                         // TODO Auto-generated catch block
    14                         e.printStackTrace();
    15                     }
    16                 }
    17             }
    18         }).start();
    19         for (int i = 0; i < 50; i++) {
    20             try {
    21                 business.main(i);
    22             } catch (InterruptedException e) {
    23                 // TODO Auto-generated catch block
    24                 e.printStackTrace();
    25             }
    26         }
    27     }
    28     static class Business{
    29         BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
    30         BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
    31         {
    32             try {
    33                 queue2.put(1);//保证queue2阻塞
    34             } catch (InterruptedException e) {
    35                 // TODO Auto-generated catch block
    36                 e.printStackTrace();
    37             }
    38         }
    39         
    40         public void main(int i) throws InterruptedException{
    41             queue1.put(1);//阻塞queue1
    42             for (int j = 0; j < 100; j++) {
    43                 System.out.println("main thread is looping of "+j +" in " + i);
    44             }
    45             queue2.take();//唤醒queue2
    46         }
    47         public void sub(int i) throws InterruptedException{
    48             queue2.put(1);//阻塞queue2
    49             for (int j = 0; j < 10; j++) {
    50                 System.out.println("sub thread is looping of "+j +" in " + i);
    51             }
    52             queue1.take();//唤醒queue1
    53         }
    54     }
    55 }
      BlockingQueue实现了线程同步,不可在方法中再次加入同步限制,否则会出现死锁。

      3:在API中有一个阻塞对象实现生产者和消费者的例子

     1 class Producer implements Runnable {
     2    private final BlockingQueue queue;
     3    Producer(BlockingQueue q) { queue = q; }
     4    public void run() {
     5      try {
     6        while(true) { queue.put(produce()); }
     7      } catch (InterruptedException ex) { ... handle ...}
     8    }
     9    Object produce() { ... }
    10  }
    11 
    12  class Consumer implements Runnable {
    13    private final BlockingQueue queue;
    14    Consumer(BlockingQueue q) { queue = q; }
    15    public void run() {
    16      try {
    17        while(true) { consume(queue.take()); }
    18      } catch (InterruptedException ex) { ... handle ...}
    19    }
    20    void consume(Object x) { ... }
    21  }
    22 
    23  class Setup {
    24    void main() {
    25      BlockingQueue q = new SomeQueueImplementation();
    26      Producer p = new Producer(q);
    27      Consumer c1 = new Consumer(q);
    28      Consumer c2 = new Consumer(q);
    29      new Thread(p).start();
    30      new Thread(c1).start();
    31      new Thread(c2).start();
    32    }
    33  }

      使用阻塞队列代码要简单得多,不需要再单独考虑同步和线程间通信的问题。

      在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。

      阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。

      参考资料:http://www.cnblogs.com/dolphin0520/p/3932906.html

           javaAPI

  • 相关阅读:
    数据结构小总结(成都磨子桥技工学校数据结构前12题)
    Scrum 冲刺博客第二篇
    Scrum 冲刺博客第一篇
    centos部署keepalived服务
    第四周作业
    Svelte 中怎样做双向数据绑定
    Svelte 中多层组件事件转发
    Svelte 中的事件修饰符
    怎样在 Svelte 中设置自定义事件
    怎样使用 Svelte 中的异步块
  • 原文地址:https://www.cnblogs.com/lcngu/p/5224476.html
Copyright © 2011-2022 走看看