zoukankan      html  css  js  c++  java
  • skynet源码阅读<4>--定时器实现

        昨天和三石公聊天,他提到timer的实现原理,我当时迟疑了一下,心想timer不是系统底层时钟中断驱动上层进程/线程,累积计时实现的么?他简述了timer的实现,什么堆排序,优先级队列等,与我想象的不同。正好这两天在作skynet笔记,以前也没有留意过skynet的timer,这次干脆就看看它是怎么实现的。看了之后我明白了,我与三石公所设想的不是同一个问题。他所关心的问题其实是:框架被注册多个定时回调,如何管理并尽可能高效地触发这些回调。这里我们假设框架将定时消息抽象为timer_node,框架自身最小时间片为T,不同的timer_node按照其将要被触发的时间先后排序,那么只需要在每个时间片T到来时,找到在此时刻Tk的所有timer_node加以触发就可以了。这本质是个排序问题,三石公所述的,其实是排序的不同实现方案而已。言归正传,在分析skynet的代码之前,我们先来就其实现做个简单的说明。

        假设讨论的数值范围为0~999,给定一个数N,我们可以按照以下方式组织:

        首先判断N的大小在哪个层级,这里对应的是【个、十、百】共3个级别,每个级别上分别建立10个桶,以存储加入进来的数据。假设N=2,那么它应落在个位级别上的Bucket2里面;假设N=32,那么它应落在百位级别上的Bucket3里面;假设N=932,那么它应落在百位级别上的Bucket9里面。可以看到,级别越高,如果所划分的桶数不变的话,单个桶中所能容纳的元素就越多,那么从此桶查找目标元素就越耗时。

        设时刻t从0开始,一个时间片为10MS。建立单独一个个位级别的集合S,把个位级别的桶都加进来。t变化时,从S中取出桶来,触发桶中的timer_node。t到10时,S中的元素使用完毕,我们从十位级别拿出桶B0来,把B0中的元素与t作比较。由于t当前已经是十位级别,B0中的元素相对于t此时已经变成个位级别,因此B0内的元素会重新添加到集合S中来。每次当t走完一个新的周期0~9,就重新这个筛选的过程,B1,B2……依次类推。而当t刚增长到百位级别时,它要从百位级别拿出桶B0,其中的元素一部分相对于t是个位级别,一部分相对于t是十位级别,于是前者被添加到S,而后者被重新筛选添加到十位级别的桶中去。在这之后,t每走完一个十位级别的周期(0~10),就要重复十位级别的重新筛选过程;当t走完一个百位级别的周期时(比如从100到200),则要取出下一个百位级别的桶,然后重复百位级别的筛选过程,依次类推。

        在t不断变化的过程中,如果有新的timer_node加入进来,则计算出相对于t的级别,加入到对应的桶中去。

        说完思路,来看看skynet_timer的实现。先看下数据结构的设计:

     1 struct timer_node {
     2     struct timer_node *next;
     3     uint32_t expire;
     4 };
     5 
     6 struct link_list {
     7     struct timer_node head;
     8     struct timer_node *tail;
     9 };
    10 
    11 struct timer {
    12     struct link_list near[TIME_NEAR];
    13     struct link_list t[4][TIME_LEVEL];
    14     struct spinlock lock;
    15     uint32_t time;
    16     uint32_t starttime;
    17     uint64_t current;
    18     uint64_t current_point;
    19 };
    20 
    21 static struct timer * TI = NULL;

        这里TI->near就是我们说的集合S,它分为TIME_NEAR(256)个桶,而TI->t则是其所建立的不同级别的集合,一共4个级别,每个级别分TIME_LEVEL(32)个桶。所以算起来一共有5个级别,第一个级别是0~7位共8位,后面每个级别是6位,所以总共是8+6*4=32位,正好把unit_32用完。TI->time就是我们说的当前时刻t了,它每次以10MS为单位增长。

        看下timer_node是如何加进来的:

     1 static void
     2 add_node(struct timer *T, struct timer_node *node) {
     3     uint32_t time=node->expire;
     4     uint32_t current_time=T->time;
     5     
     6     if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) {
     7         link(&T->near[time&TIME_NEAR_MASK],node);
     8     } else {
     9         int i;
    10         uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
    11         for (i=0;i<3;i++) {
    12             if ((time|(mask-1))==(current_time|(mask-1))) {
    13                 break;
    14             }
    15             mask <<= TIME_LEVEL_SHIFT;
    16         }
    17 
    18         link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);    
    19     }
    20 }

        在add_node函数中,目标时刻为time(当前时间+duration),current_time为当前时刻。TIME_NEAR_MASK为255,表示最后8位,也就是级别0集合(这里假设5个级别是从0数起),(time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)的意思是将current_time、time的最后8位都置1,即认为它们只在级别0上不等(全置1后级别0就完全相等了),比较它们在高级别上是否相等。如果是,则其相对级别为0,加入T->near集合,否则判断它们在哪个级别上不等:每次将mask左移TIME_LEVEL_SHIFT(6)位,再进行先|再==的比较,以判断是否在级别1、2、3……上不等。假设mask在第L级别移位后,做time与current_time的先|再==操作,成立的话说明time与current_time仅仅在级别L上是不同的,否则一直向上找,直到找到最高的不同级别LI。以【个、十、百】来比较的话,假设time为5320,current_time为5120,那么会在百位级别上发现不等(先|再==的结果为相等),即相对级别为百位级别,会将元素扔到百位级别的桶中去。再假设time为5320,current_time为20,那么做先|再==的操作,直到千位级别才能结束,即二者相对级别为千位级别,则要将元素扔到千位的桶中去。即找到最大的相对级别后,则要计算出time在这个级别属于哪个桶。首先是(time>>(TIME_NEAR_SHIFT+i*TIME_LEVEL_SHIFT))将此级别段的所有bit移到最右侧,然后&TIME_LEVEL_MASK求余,得到桶号,最后加入进去。

        说完加入,下一步就要看如何执行了:

     1 static void
     2 move_list(struct timer *T, int level, int idx) {
     3     struct timer_node *current = link_clear(&T->t[level][idx]);
     4     while (current) {
     5         struct timer_node *temp=current->next;
     6         add_node(T,current);
     7         current=temp;
     8     }
     9 }
    10 
    11 static void
    12 timer_shift(struct timer *T) {
    13     int mask = TIME_NEAR;
    14     uint32_t ct = ++T->time;
    15     if (ct == 0) {
    16         move_list(T, 3, 0);
    17     } else {
    18         uint32_t time = ct >> TIME_NEAR_SHIFT;
    19         int i=0;
    20 
    21         while ((ct & (mask-1))==0) {
    22             int idx=time & TIME_LEVEL_MASK;
    23             if (idx!=0) {
    24                 move_list(T, i, idx);
    25                 break;                
    26             }
    27             mask <<= TIME_LEVEL_SHIFT;
    28             time >>= TIME_LEVEL_SHIFT;
    29             ++i;
    30         }
    31     }
    32 }
    33 
    34 static inline void
    35 timer_execute(struct timer *T) {
    36     int idx = T->time & TIME_NEAR_MASK;
    37     
    38     while (T->near[idx].head.next) {
    39         struct timer_node *current = link_clear(&T->near[idx]);
    40         SPIN_UNLOCK(T);
    41         // dispatch_list don't need lock T
    42         dispatch_list(current);
    43         SPIN_LOCK(T);
    44     }
    45 }
    46 
    47 static void 
    48 timer_update(struct timer *T) {
    49     SPIN_LOCK(T);
    50 
    51     // try to dispatch timeout 0 (rare condition)
    52     timer_execute(T);
    53 
    54     // shift time first, and then dispatch timer message
    55     timer_shift(T);
    56 
    57     timer_execute(T);
    58 
    59     SPIN_UNLOCK(T);
    60 }

        timer_update中先execute,根据当前时刻TI->time取出TI->near中timer_node并向目标分发消息。然后做关键的timer_shift,当前时刻TI->time加1,此时就要判断它是否处于不同级别的周期临界上,从上面的说明我们知道,当它在某个级别Ln时,它需要将Ln中下一个桶中的元素取出重新筛选到L0~Ln-1各级别中去==>

        (mask-1)的初始值是(TIME_NEAR-1)(255,级别0范围),而ct(++TI->time,当前时间)与(mask-1)做&操作,也就是在级别0范围内求余,如果不为0的话,说明ct是在级别0内增加的,比如从2-->3。反之则说明当前时间ct在大于0的级别。在此情况下,time初值已经是ct>>TIME_NEAR_SHIFT了,其与TIME_LEVEL_MASK做&操作,即在级别1范围内求余。如果为0的话说明在大于1的级别,跳出;否则mask继续左移TIME_LEVEL_SHIFT以扩大级别范围,time则继续右移TIME_LEVEL_SHIFT在新级别内求余。当余数idx不为0时,表明ct在这个级别内增加了(比如从299->300,以【个、十、百】比较的话),那么此时就要取出这个level级别的桶idx内的元素重新筛选,根据相对于ct的级别重新分配到低级别的桶中去。move_list所做的,便是这个重新分配的过程。

        至此,算法的详细过程已经分析完毕了。其思想是只关注较近时间段内的timer_node排序,限制了每次处理的最小集合。而实际使用定时器,一般是时间小的定时器居多,时间越大,这种定时器实际使用的情况越少,因此在高级别桶内的元素数目不会很多,重新筛选分配的开销也不大。最后,再看看在skynet_start.c中,是如何驱动skynet_timer的:

     1 static void *
     2 thread_timer(void *p) {
     3     struct monitor * m = p;
     4     skynet_initthread(THREAD_TIMER);
     5     for (;;) {
     6         skynet_updatetime();
     7         CHECK_ABORT
     8         wakeup(m,m->count-1);
     9         usleep(2500);
    10     }
    11 // other
    ...
    }

        可以看到,是开启了一个单独的线程,每隔2500微秒(2.5毫秒)来驱动的。

  • 相关阅读:
    Netty入门——客户端与服务端通信
    使用配置文件自定义Ribbon配置
    使用Java代码自定义Ribbon配置
    Spring Cloud Ribbon入门
    负载均衡简介
    常见的几种负载均衡算法
    Eureka编程
    Eureka多机高可用
    Maven项目打包成可执行Jar文件
    Eureka单机高可用伪集群配置
  • 原文地址:https://www.cnblogs.com/Jackie-Snow/p/6559381.html
Copyright © 2011-2022 走看看