zoukankan      html  css  js  c++  java
  • Java 使用DelayQueue实现延迟队列

    1.简介:

    • DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。
    • 队列的头部,是延迟期满后保存时间最长的delay元素。

    2.使用场景:

    • 缓存系统设计:使用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,一旦从DelayQueue中取出元素,就表示有元素到期。
    • 定时任务调度:使用DelayQueue保存当天要执行的任务和执行的时间,一旦从DelayQueue中获取到任务,就开始执行,比如Timer,就是基于DelayQueue实现的。

    3.使用条件:

    • 存放DelayQueue的元素,必须继承Delay接口,Delay接口使对象成为延迟对象。
    • 该接口强制实现两个方法:
      1.CompareTo(Delayed o):用于比较延时,队列里元素的排序依据,这个是Comparable接口的方法,因为Delay实现了Comparable接口,所以需要实现。
      2.getDelay(TimeUnit unit):这个接口返回到激活日期的--剩余时间,时间单位由单位参数指定。
    • 此队列不允许使用null元素。

    BlockingQueue中take、offer、put、add

    | | 抛出异常 | 特殊值 | 阻塞 | 超时 |
    | 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
    | 移除 | remove() | poll() | take() | poll(time, unit) |
    | 检查 | element() | peek() | 不可用 | 不可用 |

    • offer:插入到队列,成功返回true,如果当前没有可用空间,返回false。
    • add:入队,如果没有可用空间,抛出异常,IllegalStateException。
    • put:插入队列,等待可用空间,阻塞,直到能够有空间插入元素。
    • take:获取并移除队列头部,在元素变的可用之前一直等待。

    三、实际开发中的应用

    简单的延时队列要有三部分:第一实现了Delayed接口的消息体、第二消费消息的消费者、第三存放消息的延时队列
    1.消息体。实现接口 Delayed ,重写方法 compareTo 和 getDelay

    package com.chitic.supplywater.app.aop;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Worker<T> implements Delayed {
        //消息类型
        private int type;
        // 消息内容
        private T body;
        //创建时刻的时间
        private long start = System.currentTimeMillis();
        //延迟时长,这个是必须的属性因为要按照这个判断延时时长。
        private long excuteTime;
    
        // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期
        // 否则还没到期
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert((start + this.excuteTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        /**
         *比较时间,以最接近执行时间的任务,排在最前面
         */
        @Override
        public int compareTo(Delayed delayed) {
            Worker msg = (Worker) delayed;
            return (int)(((start + this.excuteTime) - System.currentTimeMillis()) -((msg.start + msg.excuteTime) - System.currentTimeMillis())) ;
        }
    
        public int getType() {
            return this.type;
        }
    
        public T getBody() {
            return this.body;
        }
    
    
        public Worker(int type, T body, long excuteTime) {
            this.type = type;
            this.body = body;
            this.excuteTime = excuteTime;
        }
    }

    管理器

    package com.chitic.supplywater.app.aop;
    
    import java.util.concurrent.DelayQueue;
    
    public class DelayQueueManager {
    
        private static DelayQueue<Worker> delayQueue;
    
        static {
            // 创建延时队列
           delayQueue = new DelayQueue<>();
        }
    
        public static DelayQueue<Worker> getDelayQueue(){
            return delayQueue;
        }
    
        public static void putDelayQueue(Worker worker){
            delayQueue.put(worker);
        }
    }

    2,消费者

    package com.chitic.supplywater.app.aop;
    
    import java.util.concurrent.DelayQueue;
    
    public class ConsumerDelayQueue implements Runnable {
        // 延时队列 ,消费者从其中获取消息进行消费
        private DelayQueue<Worker> queue;
    
        public ConsumerDelayQueue(DelayQueue<Worker> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Worker take = queue.take();
                    System.out.println("消费消息类型:" + take.getType() + " 消息体:" + take.getBody()+":"+System.currentTimeMillis());
                    //TODO 此处可以进行推送等一系列操作
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    3,存放消息的延时队列

        @Resource
        ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        @Test
        public void test() throws InterruptedException {
            Order order = new Order();
            order.setOrderId("9000");
            order.setOrderName("买个苹果12");
            Worker<Order> orderWorker = new Worker<>(1, order, 5000);
            DelayQueueManager.putDelayQueue(orderWorker);
    
            threadPoolTaskExecutor.execute(new ConsumerDelayQueue(DelayQueueManager.getDelayQueue()));
            //为了方便看效果
            for (int i = 1; i <= 10; i++) {
                Thread.sleep(1000);
                System.out.println("========================="+ i);
            }
    
        }

    线程池配置

    package com.chitic.supplywater.app.aop;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * @Description //TODO 线程池配置   定时任务,异步任务
     * @Author GaoX
     * @Date 2020/12/15 16:07
     */
    @Configuration
    @EnableAsync
    @EnableScheduling
    @Slf4j
    public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer {
    
        /**
         * @description : 定时任务使用的线程池
         * @params: []
         * @return: org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler
         * @author: GaoX
         * @date: 2020/12/15 16:07
         */
        @Bean(destroyMethod = "shutdown", name = "taskScheduler")
        public ThreadPoolTaskScheduler taskScheduler(){
            ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
            scheduler.setPoolSize(10);
            scheduler.setThreadNamePrefix("task-");
            scheduler.setAwaitTerminationSeconds(600);
            scheduler.setWaitForTasksToCompleteOnShutdown(true);
            return scheduler;
        }
    
        /**
         * @description : 异步任务执行线程池
         * @params: []
         * @return: org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
         * @author: GaoX
         * @date: 2020/12/15 16:10
         */
        @Bean(name = "asyncExecutor")
        public ThreadPoolTaskExecutor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程数量
            executor.setCorePoolSize(8);
            //队列中最大任务数
            executor.setQueueCapacity(20);
            //线程空闲后最大存活时间
            executor.setKeepAliveSeconds(100);
            //最大线程数量
            executor.setMaxPoolSize(10);
            //线程名称前缀
            executor.setThreadNamePrefix("taskExecutor-");
            //当达到最大线程数时如何处理新任务
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
            ThreadPoolTaskScheduler taskScheduler = taskScheduler();
            scheduledTaskRegistrar.setTaskScheduler(taskScheduler);
        }
    
        @Override
        public Executor getAsyncExecutor() {
            return asyncExecutor();
        }
    
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (throwable, method, objects) -> {
                log.error("异步任务执行出现异常, message {}, emthod {}, params {}", throwable, method, objects);
            };
        }
    
    }
    既然我已经踏上这条道路,那么,任何东西都不应妨碍我沿着这条路走下去!!!!!!!!!! !!! ! !! ! 个人公众号《后端技术开发之路》,欢迎您关注!

    如果您觉得我写还不过,请打赏下在下吧!【高木子】!

  • 相关阅读:
    操作系统的概念
    流量监听的基础利用
    后门的学习与清理
    关于phpstudy打开phpmyadmin无法打开
    SSRF服务器请求伪造
    javascript基础流程控制
    Repository中自己写修改、删除语句是需要添加的注解
    SpringBoot链接数据库
    SpringBoot项目实战:企业项目管理系统
    ErrorUtils
  • 原文地址:https://www.cnblogs.com/gaomanito/p/14523286.html
Copyright © 2011-2022 走看看