zoukankan      html  css  js  c++  java
  • 哈希轮

    本文分析 netty 的 HashedWheelTimer 主要流程

    示例:

    public static void main(String[] args) {
        // 指定 tick 的间隔时间,tick 的数量(规格化成 2 的幂之后,哈希数组的长度)
        HashedWheelTimer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 100,
                TimeUnit.MILLISECONDS, 32, true, 2);
    
        timer.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("zhang");
            }
        }, 10, TimeUnit.SECONDS);
    
    //        timer.stop();
    }

    结论:

    一个哈希数组,数组元素是双向链表,链表元素是定时任务
    一个队列,提交的定时任务先放到队列中
    一个线程,一个 tick 一个 tick 地走,把队列中的任务挂到哈希槽上,触发过期任务,remainingRounds 减一
    参数:tick 时间间隔,tick 总数。

    提交定时任务,先把任务放入队列中

    // 提交的任务首先加入到 queue 中
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    // io.netty.util.HashedWheelTimer#newTimeout
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
    
        start();
    
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

    线程死循环

    // io.netty.util.HashedWheelTimer.Worker#run
    public void run() {
        startTime = System.nanoTime();
        if (startTime == 0) {
            startTime = 1;
        }
        startTimeInitialized.countDown();
        do {
            // 等待下一个 tick
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                // 当前 tick 数取模
                int idx = (int) (tick & mask);
                // 处理取消的 task
                processCancelledTasks();
                // 根据 idx 定位到 bucket
                HashedWheelBucket bucket = wheel[idx];
                // 把 queue 中任务转移到 bucket 的链表中
                transferTimeoutsToBuckets();
                // 遍历 bucket 上的定时任务,过期则触发,否则 remainingRounds 减一
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    
        // Fill the unprocessedTimeouts so we can return them from stop() method.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        processCancelledTasks();
    }
    // io.netty.util.HashedWheelTimer.Worker#transferTimeoutsToBuckets
    private void transferTimeoutsToBuckets() {
        // 最多转移十万个任务
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // 队列为空,退出
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // 当前任务被取消,跳过
                continue;
            }
            // 任务截止时间 / tick 间隔
            long calculated = timeout.deadline / tickDuration;
            // 任务需要等待多少轮
            timeout.remainingRounds = (calculated - tick) / wheel.length;
            // 取 calculated 和当前 tick 数的较大值
            final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
            // 取模决定 bucket 位置
            int stopIndex = (int) (ticks & mask);
    
            HashedWheelBucket bucket = wheel[stopIndex];
            // 加入到 bucket 的链表
            bucket.addTimeout(timeout);
        }
    }

    触发定时任务,后者 remainingRounds 减一

    // io.netty.util.HashedWheelTimer.HashedWheelBucket#expireTimeouts
    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;
    
        // process all timeouts
        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            if (timeout.remainingRounds <= 0) {
                // 从链表中删除任务
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                    // 触发定时任务
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                // 任务被取消,从链表中删除
                next = remove(timeout);
            } else {
                // 等待的轮数减一
                timeout.remainingRounds --;
            }
            timeout = next;
        }
    }
  • 相关阅读:
    web每隔几秒页面刷新
    lucene.Net (C#)
    hibernatesearch
    org.simpleframework.xml.core.Persister
    Hibernate 注解 annotation
    WebService大讲堂之Axis2(2):复合类型数据的传递
    myEclipse代码格式化
    WebService大讲堂之Axis2(1):用POJO实现0配置的WebService
    Hibernate Search基本配置和使用
    java.sql.preparedstatement的应用
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12591853.html
Copyright © 2011-2022 走看看