zoukankan      html  css  js  c++  java
  • Netty 工具类 —— HashedWheelTimer 讲解

    一、前言

    在网络通信中管理上万的连接,每个连接都有超时任务,如果为每个任务启动一个TImer超时器,那么会占用大量资源。为了解决这个问题,可用Netty工具类HashedWheelTimer。

    二、HashedWheelTimer原理

    1.概论

    (学习一个类,最好的方式是看api文档或源码的注释,我下载了Netty源码)

    这个类用来计划执行非精准的I/O超时。可以通过指定每一格的时间间隔来改变执行时间的精确度。在大多数网络应用中,I/O超时不需要十分准确,因此,默认的时间间隔是100 毫秒,这个值适用于大多数场合。HashedWheelTimer内部结构可以看做是个车轮,简单来说,就是TimerTask的hashTable的车轮。车轮的size默认是512,可以通过构造函数自己设置这个值。注意,当HashedWheelTimer被实例化启动后,会创建一个新的线程,因此,你的项目里应该只创建它的唯一一个实例

    (这个类的源自一位教授的论文 Tony Lauck's paper。算法是代码的灵魂,最难的是算法)

    2.结构

    下方图示阐述了大致的结构模型

    Demo

     1 @Test(timeout = 50000000)
     2     public void testTimerOverflowWheelLength() throws InterruptedException {
     3         final HashedWheelTimer timer = new HashedWheelTimer(
     4             Executors.defaultThreadFactory(), 100, TimeUnit.MILLISECONDS, 32);
     6 
     7         timer.newTimeout(new TimerTask() {
     8             @Override
     9             public void run(final Timeout timeout) throws Exception {
    10                 System.out.println("lee");   //打印名字
    13             }
    14         }, 1000, TimeUnit.MILLISECONDS);
    15 
    16         Thread.sleep(10000);
    18         assertFalse(timer.stop().isEmpty());
    19     }

    通过上面demo,我再说明一下hashedWheelTimer类的构造。

    超时任务1000毫秒,超时之后,由hashedWheelTimer类中的worker线程,执行超时之后的任务(打印名字)。hashedWheelTimer有32个槽(相当于一个圆的32分之一),每移动一个槽的时间是100毫秒。
    任务需要经过的tick数为: 1000 / 100 = 10次         (等待时长 / tickDuration)
    任务需要经过的轮数为  : 10次 / 32次/轮 = 0轮     (tick总次数 / ticksPerWheel)
    因为任务超时后不能马上被worker线程执行,需要等到worker线程移到相应卡槽位置时才会执行,因此说执行时间不精确。

    hashedWheelTimer的核心是Worker线程,主要负责每过tickDuration时间就累加一次tick. 同时, 也负责执行到期的timeout任务, 此外,还负责添加timeou任务到指定的wheel中。

    接下看看源码部分。

    构造器

    构造器的各个参数的说明和注释十分详细,应该没有不好理解的地方。

     1     /**
     2      * Creates a new timer.
     3      *
     4      * @param threadFactory        a {@link ThreadFactory} that creates a
     5      *                             background {@link Thread} which is dedicated to    用来创建后台线程
     6      *                             {@link TimerTask} execution.
     7      * @param tickDuration         the duration between tick                          每格时间间隔 默认 100
     8      * @param unit                 the time unit of the {@code tickDuration}          时间单位 默认 毫秒
     9      * @param ticksPerWheel        the size of the wheel                              轮子size(一圈多少格)
    默认 512 如果不是2的N次方,则去大于该参数的第一个2的N次方
    理由 便于Hash值的计算10 * @param leakDetection {@code true} if leak detection should be enabled always, 11 * if false it will only be enabled if the worker thread is not 12 * a daemon thread. 13 * @param maxPendingTimeouts The maximum number of pending timeouts after which call to 14 * {@code newTimeout} will result in 15 * {@link java.util.concurrent.RejectedExecutionException} 16 * being thrown. No maximum pending timeouts limit is assumed if 17 * this value is 0 or negative. 最大待定超时时间 默认 不设置 18 * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null} 19 * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0 20 */ 21 public HashedWheelTimer( 22 ThreadFactory threadFactory, 23 long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, 24 long maxPendingTimeouts) {
    ...
    38 39 // Normalize ticksPerWheel to power of two and initialize the wheel. 40 wheel = createWheel(ticksPerWheel); 41 mask = wheel.length - 1; 42 43 // Convert tickDuration to nanos. 44 long duration = unit.toNanos(tickDuration);
    ...
    52 53 if (duration < MILLISECOND_NANOS) { 54 if (logger.isWarnEnabled()) { 55 logger.warn("Configured tickDuration %d smaller then %d, using 1ms.", 56 tickDuration, MILLISECOND_NANOS); 57 } 58 this.tickDuration = MILLISECOND_NANOS; 59 } else { 60 this.tickDuration = duration; 61 } 62 // 这里创建worker线程,worker线程是重点。 63 workerThread = threadFactory.newThread(worker); 64 65 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; 66 67 this.maxPendingTimeouts = maxPendingTimeouts; 68 69 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && 70 WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { 71 reportTooManyInstances(); 72 } 73 }

    我们接下来看newTimeout()方法

     1     @Override
     2     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
               ... 9 
    10         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    11 
    12         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
    13             pendingTimeouts.decrementAndGet();
    14             throw new RejectedExecutionException("Number of pending timeouts ("
    15                 + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
    16                 + "timeouts (" + maxPendingTimeouts + ")");
    17         }
    18 
    19         start();
    20 
    21         // Add the timeout to the timeout queue which will be processed on the next tick.
    22         // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    23         long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    24 
    25         // Guard against overflow.
    26         if (delay > 0 && deadline < 0) {
    27             deadline = Long.MAX_VALUE;
    28         }
    29         HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    30         timeouts.add(timeout);
    31         return timeout;
    32     }

    我们接着进到worker线程的启动

     1 /**
     2      * Starts the background thread explicitly.  The background thread will
     3      * start automatically on demand even if you did not call this method.
     4      *
     5      * @throws IllegalStateException if this timer has been
     6      *                               {@linkplain #stop() stopped} already
     7      */
     8     public void start() {     // 这个方法为什么是public的?我们可以在实例化HashedWheelTimer后,主动调用这个方法,启动worker线程
     9         switch (WORKER_STATE_UPDATER.get(this)) {
    10             case WORKER_STATE_INIT:
    11                 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
    12                     workerThread.start();            // 在这里启动了worker线程
    13                 }
    14                 break;
    15             case WORKER_STATE_STARTED:
    16                 break;
    17             case WORKER_STATE_SHUTDOWN:
    18                 throw new IllegalStateException("cannot be started once stopped");
    19             default:
    20                 throw new Error("Invalid WorkerState");
    21         }
    22 
    23         // Wait until the startTime is initialized by the worker.
    // 为了等待worker线程初始化startTime
    24 while (startTime == 0) { 25 try { 26 startTimeInitialized.await(); 27 } catch (InterruptedException ignore) { 28 // Ignore - it will be ready very soon. 29 } 30 } 31 }

     下一步,我们看看worker线程里面的操作。

    首先是初始化startTime,CountDownLatch的触发,后面do while操作可以看作是圆盘在一个个转动,每转一个就会用worker线程处理,格子中超时的任务。

     1         @Override
     2         public void run() {
     3             // Initialize the startTime.
     4             startTime = System.nanoTime();
     5             if (startTime == 0) {
     6                 // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
     7                 startTime = 1;
     8             }
     9 
    10             // Notify the other threads waiting for the initialization at start().
    11             startTimeInitialized.countDown();
    12 
    13             do {
    14                 final long deadline = waitForNextTick();
    15                 if (deadline > 0) {
    16                     int idx = (int) (tick & mask);
    17                     processCancelledTasks();
    18                     HashedWheelBucket bucket =
    19                             wheel[idx];
    20                     transferTimeoutsToBuckets();
    21                     bucket.expireTimeouts(deadline);
    22                     tick++;
    23                 }
    24             } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
    25 
    26             // Fill the unprocessedTimeouts so we can return them from stop() method.
    27             for (HashedWheelBucket bucket: wheel) {
    28                 bucket.clearTimeouts(unprocessedTimeouts);
    29             }
    30             for (;;) {
    31                 HashedWheelTimeout timeout = timeouts.poll();
    32                 if (timeout == null) {
    33                     break;
    34                 }
    35                 if (!timeout.isCancelled()) {
    36                     unprocessedTimeouts.add(timeout);
    37                 }
    38             }
    39             processCancelledTasks();
    40         }

    ----------------------------------------------------------------------------------------------

     参考资源:

    https://www.jianshu.com/p/328f22432638

    https://my.oschina.net/haogrgr/blog/489320

    https://chuansongme.com/n/1650380646616

  • 相关阅读:
    因安装包依赖问题导致无法安装的解决办法!
    Ubuntu18.04安装qemu遇到问题-qemu : Depends: qemu-system (>= 1:2.11+dfsg-1ubuntu7)
    理解mount -t proc proc /proc
    printf "%.*s"
    Linux 内核内存分配函数devm_kmalloc()和devm_kzalloc()
    为什么 extern 使用 const 修饰的变量会编译不过?
    php openssl_sign 对应 C#版 RSA签名
    win7中用iis部署ssl服务
    找出windows系统上最大的文件
    windows 创建指定大小文件
  • 原文地址:https://www.cnblogs.com/lihao007/p/10588072.html
Copyright © 2011-2022 走看看