zoukankan      html  css  js  c++  java
  • 推荐一款高效的处理延迟任务神器

    时间轮算法

    时间轮是一种高效、低延迟的调度数据结构。其在Linux内核中广泛使用,是Linux内核定时器的实现方法和基础之一。按使用场景,大致可以分为两种时间轮:原始时间轮和分层时间轮。分层时间轮是原始时间轮的升级版本,来应对时间“槽”数量比较大的情况,对内存和精度都有很高要求的情况。延迟任务的场景一般只需要用到原始时间轮就可以了。

    代码案例

    推荐使用Netty提供的HashedWheelTimer工具类来实现延迟任务。

    引入依赖:

    <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-common</artifactId>
          <version>4.1.23.Final</version>
    </dependency>
    

    红包过期队列信息:

    /**
     * 红包过期队列信息
     */
    public class RedPacketTimerTask implements TimerTask {
    
        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    
        /**
         * 红包 ID
         */
        private final long redPacketId;
    
        /**
         * 创建时间戳
         */
        private final long timestamp;
    
        public RedPacketTimerTask(long redPacketId) {
            this.redPacketId = redPacketId;
            this.timestamp = System.currentTimeMillis();
        }
    
        @Override
        public void run(Timeout timeout) {
    		//异步处理任务
            System.out.println(String.format("任务执行时间:%s,红包创建时间:%s,红包ID:%s",
                    LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(F), redPacketId));
        }
    }
    
    

    测试用例:

    /**
     * 基于 netty 的时间轮算法 HashedWheelTimer 实现的延迟任务
     */
    public class RedPacketHashedWheelTimer {
    
        private static final DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    
        public static void main(String[] args) throws Exception {
            ThreadFactory factory = r -> {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("RedPacketHashedWheelTimerWorker");
                return thread;
            };
            /**
             * @param tickDuration - 每tick一次的时间间隔
             * @param unit - tickDuration 的时间单位
             * @param ticksPerWheel - 时间轮中的槽数
             * @param leakDetection - 检查内存溢出
             */
            Timer timer = new HashedWheelTimer(factory, 1,
                                               TimeUnit.SECONDS, 100,true);
            System.out.println(String.format("开始任务时间:%s",LocalDateTime.now().format(F)));
            for(int i=1;i<10;i++){
                TimerTask timerTask = new RedPacketTimerTask(i);
                timer.newTimeout(timerTask, i, TimeUnit.SECONDS);
            }
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    

    打印任务执行日志:

    开始任务时间:2020-02-12 15:22:23.404
    任务执行时间:2020-02-12 15:22:25.410,红包创建时间:2020-02-12 15:22:23.409,红包ID:1
    任务执行时间:2020-02-12 15:22:26.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:2
    任务执行时间:2020-02-12 15:22:27.424,红包创建时间:2020-02-12 15:22:23.414,红包ID:3
    任务执行时间:2020-02-12 15:22:28.410,红包创建时间:2020-02-12 15:22:23.414,红包ID:4
    任务执行时间:2020-02-12 15:22:29.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:5
    任务执行时间:2020-02-12 15:22:30.409,红包创建时间:2020-02-12 15:22:23.414,红包ID:6
    任务执行时间:2020-02-12 15:22:31.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:7
    任务执行时间:2020-02-12 15:22:32.409,红包创建时间:2020-02-12 15:22:23.414,红包ID:8
    任务执行时间:2020-02-12 15:22:33.411,红包创建时间:2020-02-12 15:22:23.414,红包ID:9
    

    源码相关

    其核心是workerThread线程,主要负责每过tickDuration时间就累加一次tick。同时也负责执行到期的timeout任务以及添加timeout任务到指定的wheel中。

    构造方法:

    public HashedWheelTimer(
                ThreadFactory threadFactory,
                long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
                long maxPendingTimeouts) {
    
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            if (unit == null) {
                throw new NullPointerException("unit");
            }
            if (tickDuration <= 0) {
                throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
            }
            if (ticksPerWheel <= 0) {
                throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
            }
    
            // Normalize ticksPerWheel to power of two and initialize the wheel.
            wheel = createWheel(ticksPerWheel);
            mask = wheel.length - 1;
    
            // Convert tickDuration to nanos.
            this.tickDuration = unit.toNanos(tickDuration);
    
            // Prevent overflow.
            if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
                throw new IllegalArgumentException(String.format(
                        "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                        tickDuration, Long.MAX_VALUE / wheel.length));
            }
    		//这里-爪洼笔记
            workerThread = threadFactory.newThread(worker);
    
            leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
    
            this.maxPendingTimeouts = maxPendingTimeouts;
    
            if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
                reportTooManyInstances();
            }
    }
    

    新增任务,创建即启动:

    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            if (unit == null) {
                throw new NullPointerException("unit");
            }
    
            long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    
            if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
                pendingTimeouts.decrementAndGet();
                throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
            }
    		//这里-爪洼笔记
            start();
    
            // Add the timeout to the timeout queue which will be processed on the next tick.
            // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
            long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    
            // Guard against overflow.
            if (delay > 0 && deadline < 0) {
                deadline = Long.MAX_VALUE;
            }
            HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
            timeouts.add(timeout);
            return timeout;
    }
    

    线程启动:

       /**
         * Starts the background thread explicitly.  The background thread will
         * start automatically on demand even if you did not call this method.
         *
         * @throws IllegalStateException if this timer has been
         *                               {@linkplain #stop() stopped} already
         */
        public void start() {
            switch (WORKER_STATE_UPDATER.get(this)) {
                case WORKER_STATE_INIT:
                    if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                        workerThread.start();
                    }
                    break;
                case WORKER_STATE_STARTED:
                    break;
                case WORKER_STATE_SHUTDOWN:
                    throw new IllegalStateException("cannot be started once stopped");
                default:
                    throw new Error("Invalid WorkerState");
            }
    
            // Wait until the startTime is initialized by the worker.
            while (startTime == 0) {
                try {
                    startTimeInitialized.await();
                } catch (InterruptedException ignore) {
                    // Ignore - it will be ready very soon.
                }
            }
        }
    

    执行相关操作:

    public void run() {
                // Initialize the startTime.
                startTime = System.nanoTime();
                if (startTime == 0) {
                    // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                    startTime = 1;
                }
    
                // Notify the other threads waiting for the initialization at start().
                startTimeInitialized.countDown();
    
                do {
                    final long deadline = waitForNextTick();
                    if (deadline > 0) {
                        int idx = (int) (tick & mask);
                        processCancelledTasks();
                        HashedWheelBucket bucket =
                                wheel[idx];
                        transferTimeoutsToBuckets();
                        bucket.expireTimeouts(deadline);
                        tick++;
                    }
                } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    
                // Fill the unprocessedTimeouts so we can return them from stop() method.
                for (HashedWheelBucket bucket: wheel) {
                    bucket.clearTimeouts(unprocessedTimeouts);
                }
                for (;;) {
                    HashedWheelTimeout timeout = timeouts.poll();
                    if (timeout == null) {
                        break;
                    }
                    if (!timeout.isCancelled()) {
                        unprocessedTimeouts.add(timeout);
                    }
                }
                processCancelledTasks();
    }
    
    

    小结

    以上方案并没有实现持久化和分布式,生产环境可根据实际业务需求选择使用。

    源码

    https://gitee.com/52itstyle/spring-boot-seckill

  • 相关阅读:
    国际组织
    波段
    hhgis驱动
    百度地图格式
    气象数据格式
    汽车用传感器
    无线传感器网络
    【系统软件工程师面试】7. 消息队列
    【ToDo】存储设计概述
    Arthas: Java 动态追踪技术
  • 原文地址:https://www.cnblogs.com/smallSevens/p/12312273.html
Copyright © 2011-2022 走看看