zoukankan      html  css  js  c++  java
  • 多线程-BlockingQueue,Array[Linked]BlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue

    阻塞场景 
    BlockingQueue阻塞队列,阻塞的情况主要有如下2种: 
    1. 当队列满了,进行入队操作阻塞 
    2. 当队列空了,进行出队操作阻塞 
    阻塞队列主要用在生产者/消费者模式中,下图展示了一个线程生产,一个线程消费的场景: 

    BlockingQueue接口 

    操作抛异常特殊值阻塞超时
    插入 add(o) offer(o) put(o) offer(o,timeout,unit)
    删除 remove(o) poll() take() poll(timeout,unit)

    1. 抛出异常:如果操作不能马上进行,则抛出异常。 
    2. 特殊值:如果操作不能马上进行,将会返回一个特殊的值,一般是true/false。 
    3. 阻塞:如果操作不能马上进行,操作会阻塞。 
    4. 超时:如果操作不能马上进行,操作会阻塞指定时间,如果指定时间没执行,则返回一个特殊值,一般为true/false。 
    不能向BlockingQueue中插入null.否则会报NullPointerException异常。 
    BlockingQueue子类 
    由以上的图片可以知道BlockingQueue具有如下的几个子类: 
    1. ArrayBlockingQueue 
    2. DelayQueue 
    3. LinkedBlockingQueue 
    4. PriorityBlockingQueue 
    5. SynchronousQueue

    ArrayBlockingQueue 
    一个有边界的(容量是有限的)阻塞队列,它的内部实现是一个数组。必须在初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移除的对象是头部。

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        /** The queued items */
        final Object[] items;
        public ArrayBlockingQueue(int capacity)
        public ArrayBlockingQueue(int capacity, boolean fair)
        public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)
    }
    

    LinkedBlockingQueue 
    队列大小的配置是可选的,如果我们初始时指定一个大小,它就是有边界的,如果不指定它就是无边界采用默认值Integer.MAX_VALUE的容量,它的内部是一个链表。

    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
        public LinkedBlockingQueue();
        public LinkedBlockingQueue(int capacity);
        public LinkedBlockingQueue(Collection<? extends E> c);
    }
    

    PriorityBlockingQueue 
    是一个没有边界的队列,它的排序规则和ProrityQueue一样,需要注意,PriorityBlockingQueue中允许插入null对象。 
    所有插入PriorityBlockingQueue队列的对象必须实现Comparable接口,队列优先级的排序规则就是按照我们队这个接口的实现来定义的。 
    另外PriorityBlockingQueue可以获得一个Iterator,但是这个迭代器并不保证按照优先级顺序进行迭代。

    package org.github.lujiango;
    
    import java.util.Iterator;
    import java.util.Random;
    import java.util.concurrent.PriorityBlockingQueue;
    
    class PriorityElement implements Comparable<PriorityElement> {
        private int priority;
    
        public PriorityElement(int priority) {
            this.priority = priority;
        }
    
        @Override
        public int compareTo(PriorityElement o) {
    
            return priority >= o.priority ? 1 : -1;
        }
    
        @Override
        public String toString() {
            return "PriorityElement[priority= " + priority + "]";
        }
    
    }
    
    public class Test13 {
    
        public static void main(String[] args) throws InterruptedException {
            PriorityBlockingQueue<PriorityElement> queue = new PriorityBlockingQueue<PriorityElement>();
            for (int i = 0; i < 5; i++) {
                Random rand = new Random();
                PriorityElement ele = new PriorityElement(rand.nextInt(10));
                queue.put(ele);
            }
            System.out.println("Iterator----------------");
            Iterator<PriorityElement> it = queue.iterator();
            while (it.hasNext()) {
                System.out.println(it.next());
            }
            System.out.println("PriorityBlockingQueue.tak()-----------");
            while (!queue.isEmpty()) {
                System.out.println(queue.take());
            }
        }
    }
    

    SynchronousQueue 
    内部仅仅容纳一个元素,当一个线程插入一个元素之后就被阻塞(放入元素的线程立刻被阻塞),除非这个元素被另一个线程消费。

    package org.github.lujiango;
    
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.TimeUnit;
    
    class MyThread1 implements Runnable {
        private SynchronousQueue<String> queue;
    
        public MyThread1(SynchronousQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
    
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println("take a from queue...");
                queue.take();
            } catch (InterruptedException e) {
            }
        }
    }
    
    public class Test14 {
    
        public static void main(String[] args) throws InterruptedException {
            SynchronousQueue<String> queue = new SynchronousQueue<String>();
            Thread t = new Thread(new MyThread1(queue));
            t.start();
            System.out.println("put a  into queue...");
            queue.put("a");
    
        }
    
    }
    

    DelayQueue 
    DelayQueue阻塞的是其内部元素,DelayQueue中的元素必须实现Delayed接口。

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    

    getDelay()返回值就是队列元素被释放前的存活时间,如果返回<=0,就意味着该元素已经到期需要被释放,此时DelayedQueue会通过其take()方法释放此对象,如无可释放(超期元素)元素,则take方法会阻塞。

    package org.github.lujiango;
    
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    class DelayedElement implements Delayed {
        private long expired;
        private long delay;
        private String name;
    
        public DelayedElement(String name, long delay) {
            this.name = name;
            this.delay = delay;
            this.expired = (delay + System.currentTimeMillis());
        }
    
        @Override
        public int compareTo(Delayed o) {
            DelayedElement cache = (DelayedElement) o;
            return cache.expired > expired ? 1 : -1;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return (expired - System.currentTimeMillis());
        }
    
        @Override
        public String toString() {
            return "DelayedElement[delay=" + delay + ",name=" + name + "]";
        }
    
    }
    
    public class Test15 {
    
        public static void main(String[] args) throws InterruptedException {
            DelayQueue<Delayed> queue = new DelayQueue<Delayed>();
            DelayedElement ele = new DelayedElement("3s", 3000);
            queue.put(ele);
            System.out.println(queue.take());
        }
    
    }
    

    DelayQueue的应用场景很多,比如定时关闭连接,缓存对象,超时处理等各种场景。

  • 相关阅读:
    POJ 3660 Cow Contest (floyd求联通关系)
    POJ 3660 Cow Contest (最短路dijkstra)
    POJ 1860 Currency Exchange (bellman-ford判负环)
    POJ 3268 Silver Cow Party (最短路dijkstra)
    POJ 1679 The Unique MST (最小生成树)
    POJ 3026 Borg Maze (最小生成树)
    HDU 4891 The Great Pan (模拟)
    HDU 4950 Monster (水题)
    URAL 2040 Palindromes and Super Abilities 2 (回文自动机)
    URAL 2037 Richness of binary words (回文子串,找规律)
  • 原文地址:https://www.cnblogs.com/lujiango/p/7581031.html
Copyright © 2011-2022 走看看