zoukankan      html  css  js  c++  java
  • Java多线程系列十——BlockingQueue

    参考资料:http://ifeve.com/java-synchronousqueue/
    http://www.cnblogs.com/jackyuj/archive/2010/11/24/1886553.html
    http://ifeve.com/java-blocking-queue/

    BlockingQueue的几个API认识

    方法 说明
    boolean add(E e) 添加元素,返回true或者超出队列size上限后抛异常,若队列有大小限制时,官方更建议使用offer方法
    boolean offer(E e) 添加元素,返回true或者false(超出队列size上限后)
    void put(E e) throws InterruptedException 添加元素,无返回值,若空间不足则进入waiting状态直到有空间
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException 添加元素,返回true或者false(等待时间已到但仍无可添加空间),若空间不足则等待一定时间直到成功或者放弃
    E take() throws InterruptedException 获取队列头部元素,若没有可取元素则进入waiting状态
    E poll(long timeout, TimeUnit unit) throws InterruptedException 获取队列头部元素,若没有可取元素则等待一定时间直到成功或者放弃
    boolean remove(Object o) 删除元素

    BlockingQueue派生出几个常用的类ArrayBlockingQueue/LinkedBlockingDeque/DelayQueue/PriorityBlockingQueue/SynchronousQueue,类图如下所示:

    它们的一些特性:

    • ArrayBlockingQueue:以数组保存元素,初始化时必须指定队列的容量capacity,添加元素时若达到上限进入阻塞
    • LinkedBlockingDeque:以双向链表保存元素,初始化时可指定队列的容量,若不指定,capacity默认为Integer.MAX_VALUE,添加元素时若达到上限进入阻塞
    • DelayQueue:以PriorityQueue保存元表,只能获取已到过期时间的元素,否则得到null,无容量上限,理论上可无限添加元素
    • PriorityBlockingQueue:以数组保存元素,整个队列为一棵平衡二叉树,添加元素成功后对队列内元素重排序,无容量上限,理论上可无限添加元素
    • SynchronousQueue:无缓存队列,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样

    ArrayBlockingQueue的使用案例可参考Java多线程系列三——实现线程同步的方法,本文测试DelayQueue的使用,代码如下:

    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description 利用DelayQueue清除超时请求<br/>
     *              1. 主线程从工作队列取出任务处理完成后,把任务从超时队列移除<br/>
     *              2. 超时检查线程找到超时请求后,把任务从工作队列中移除
     */
    public class DelayQueueTest {
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            int size = 36;
            DelayQueue<MyRequest> queue = new DelayQueue<>();// 用于记录是否超时的队列
            BlockingQueue<MyRequest> workQueue = new ArrayBlockingQueue<>(size);// 请求的队表
            Map<Integer, MyRequest> cache = new HashMap<>();// 请求与id的对照表
            for (int i = 0; i < size; i++) {// 初始化
                MyRequest impl = new MyRequest(i, System.nanoTime(), 120);
                queue.put(impl);
                workQueue.put(impl);
                cache.put(i, impl);
            }
            /**
             * 建立超时检查任务
             */
            Executors.newSingleThreadExecutor().submit(new Runnable() {
                @Override
                public void run() {
                    while (queue.size() > 0) {
                        try {
                            MyRequest impl = queue.take();
                            workQueue.remove(impl);// 若请求超时则把请求从队列中移除
                            System.out.println(String.format("%s is timeout", impl));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            /**
             * 建2个线程消费请求
             */
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            while (workQueue.size() > 0) {
                List<MyRequest> tasks = Arrays.asList(new MyRequest[] { workQueue.take(), workQueue.take() });
                List<Future<Integer>> futures = executorService.invokeAll(tasks);
                for (Future<Integer> future : futures) {
                    queue.remove(cache.get(future.get()));// 若请求成功,则不需要再检查是否超时
                }
            }
            executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
            executorService.shutdown();
        }
    }
    
    class MyRequest implements Delayed, Callable<Integer> {
        private int threadId;
        private long startTime;
        private long expiredTime;
    
        public MyRequest(int threadId, long startTime, long timeout) {
            this.threadId = threadId;
            this.startTime = startTime;
            this.expiredTime = TimeUnit.SECONDS.toNanos(timeout) + System.nanoTime();
        }
    
        @Override
        public Integer call() {
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(String.format("%s is ok", this));
            return threadId;
        }
    
        @Override
        public int compareTo(Delayed arg0) {
            int rtn;
            if (arg0 == null || !(arg0 instanceof MyRequest)) {
                rtn = 1;
            } else {
                MyRequest impl = (MyRequest) arg0;
                rtn = startTime > impl.getStartTime() ? 1 : (startTime == impl.getStartTime() ? 0 : -1);
            }
            return rtn;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return expiredTime - System.nanoTime();
        }
    
        public long getStartTime() {
            return startTime;
        }
    
        @Override
        public String toString() {
            return String.format("MyRequest [threadId=%s, startTime=%s, expiredTime=%s]", threadId, startTime, expiredTime);
        }
    }
  • 相关阅读:
    Windows的本地时间(LocalTime)、系统时间(SystemTime)、格林威治时间(UTC-Time)、文件时间(FileTime)之间的转换
    python基础day05
    靠谱的Pycharm安装详细教程
    15.Spring-Boot中使用AOP统一处理web层异常
    16.Spring-Boot中的定时任务
    17.Spring-Boot中HTTPS配置
    18.Spring-Boot devtools项目自动重启
    纯Java配置SpringMvc整合Spring-Data-JPA
    19.Spring-Boot多数据源配置
    1.初识Spring-Cloud
  • 原文地址:https://www.cnblogs.com/hiver/p/7536420.html
Copyright © 2011-2022 走看看