zoukankan      html  css  js  c++  java
  • erlang的send_after的时间管理

    send_after时间轮管理

     愚昧之人追赶时间之轮,聪慧之人乘坐时间之轮。 ----无名氏
    
    send_after时间管理

    send_after可以实现在多少秒后发送一条消息给某个进程,但是这个erlang是如何判断时间到了呢?

    在erlang的实现time.c文件中可以看到erlang是通过一个时间轮,可以想象成一个带有刻度的环形尺子。send_after通过不同的时间挂载到不同的刻度下,由于时间的总刻度是一定的,所以会有不同的时间挂载到相同的时间刻度下,比如总刻度为8,那么1跟9必然在相同刻度下,因为9%8==1,所以每一个挂载在刻度下的时间有一个count计数此时间是第几轮的时间,如果1是第一轮的话,9就是第二轮。当erlang虚拟机调度进程时就会处理时间轮结构,看看有没有到期的时间。
    具体判断规则为:拿出上次记录的时间到这次时间中间的所有消息,对相应的时间进行判断,若时间的轮数小于当前轮数,则说明此时间已经到了,则把相应的消息发送到相应的进程。
    如下图所示:

    具体实现(time.c)

    /*
    ** Timer entry:
    */
    //timer的结构
    typedef struct erl_timer {
        struct erl_timer* next;	/* next entry tiw slot or chain */
        struct erl_timer* prev;	/* prev entry tiw slot or chain */
        Uint slot;			/* slot in timer wheel */
        Uint count;			/* number of loops remaining */
        int    active;		/* 1=activated, 0=deactivated */
        /* called when timeout */
        //当时间到的时候,会调用相应的timeout函数发送消息到对应的进程
        void (*timeout)(void*);
        /* called when cancel (may be NULL) */
        void (*cancel)(void*);
        void* arg;        /* argument to timeout/cancel procs */
    } ErlTimer;
    
    static void
    insert_timer(ErlTimer* p, Uint t)
    {
        Uint tm;
        Uint64 ticks;
    
        /* The current slot (tiw_pos) in timing wheel is the next slot to be
         * be processed. Hence no extra time tick is needed.
         *
         * (x + y - 1)/y is precisely the "number of bins" formula.
         */
        ticks = (t + (TIW_ITIME - 1)) / TIW_ITIME;
    
        /*
         * Ticks must be a Uint64, or the addition may overflow here,
         * resulting in an incorrect value for p->count below.
         */
        ticks += do_time_update(); /* Add backlog of unprocessed time */
    
        /* calculate slot */
        tm = (ticks + tiw_pos) % TIW_SIZE; //计算刻度
        p->slot = (Uint) tm;
        p->count = (Uint) (ticks / TIW_SIZE);//计算第几轮
    
        /* insert at head of list at slot */
        //头插法,插入链表
        p->next = tiw[tm];
        p->prev = NULL;
        if (p->next != NULL)
    	p->next->prev = p;
        tiw[tm] = p;
    
    
        /* insert min time */
        if ((tiw_nto == 0) || ((tiw_min_ptr != NULL) && (ticks < tiw_min))) {
    	tiw_min = ticks;//记录最小的秒数
    	tiw_min_ptr = p;
        }
        if ((tiw_min_ptr == p) && (ticks > tiw_min)) {
    	/* some other timer might be 'min' now */
    	tiw_min = 0;
    	tiw_min_ptr = NULL;
        }
    
        tiw_nto++;//时间的timer个数
    }
    
    //每个刻度(插槽)下的timer是一个双链表的结构
    static void remove_timer(ErlTimer *p) {
        /* first */
        if (!p->prev) {
    	tiw[p->slot] = p->next;
    	if(p->next)
    	    p->next->prev = NULL;
        } else {
    	p->prev->next = p->next;
        }
    
        /* last */
        if (!p->next) {
    	if (p->prev)
    	    p->prev->next = NULL;
        } else {
    	p->next->prev = p->prev;
        }
    
        p->next = NULL;
        p->prev = NULL;
        /* Make sure cancel callback isn't called */
        p->active = 0;
        tiw_nto--;
    }
    
    void
    erts_init_time(void)
    {
        int i, itime;
    
        /* system dependent init; must be done before do_time_init()
           if timer thread is enabled */
        itime = erts_init_time_sup();
    #ifdef TIW_ITIME_IS_CONSTANT
        if (itime != TIW_ITIME) {
    	erl_exit(ERTS_ABORT_EXIT, "timer resolution mismatch %d != %d", itime, TIW_ITIME);
        }
    #else
        tiw_itime = itime;
    #endif
    
        erts_smp_mtx_init(&tiw_lock, "timer_wheel");
    	//从这里可以看到一个事件轮是一个ErlTimer* 类型的数组 数组大小是65536(一般情况下)
        tiw = (ErlTimer**) erts_alloc(ERTS_ALC_T_TIMER_WHEEL,
    				  TIW_SIZE * sizeof(ErlTimer*));
        for(i = 0; i < TIW_SIZE; i++)
    	tiw[i] = NULL;
        do_time_init();
        tiw_pos = tiw_nto = 0;
        tiw_min_ptr = NULL;
        tiw_min = 0;
    }
    
    static ERTS_INLINE void bump_timer_internal(erts_short_time_t dt) /* PRE: tiw_lock is write-locked */
    {
        Uint keep_pos;
        Uint count;
        ErlTimer *p, **prev, *timeout_head, **timeout_tail;
        Uint dtime = (Uint) dt;
    
        /* no need to bump the position if there aren't any timeouts */
        if (tiw_nto == 0) {
    	erts_smp_mtx_unlock(&tiw_lock);
    	return;
        }
    
        /* if do_time > TIW_SIZE we want to go around just once */
        count = (Uint)(dtime / TIW_SIZE) + 1;//会遍历的轮数
        keep_pos = (tiw_pos + dtime) % TIW_SIZE;//
        if (dtime > TIW_SIZE) dtime = TIW_SIZE;//最多遍历一轮
    
        timeout_head = NULL;
        timeout_tail = &timeout_head;
        while (dtime > 0) {
    	/* this is to decrease the counters with the right amount */
    	/* when dtime >= TIW_SIZE */
    	if (tiw_pos == keep_pos) count--;//过了一轮count自减
    	prev = &tiw[tiw_pos];//拿到当前毫秒刻度的所有timer
    	while ((p = *prev) != NULL) {
    	    ASSERT( p != p->next);
    	    if (p->count < count) {     /* we have a timeout */ //time的轮数小于当前轮数,超时了。
    		/* remove min time */
    		if (tiw_min_ptr == p) {
    		    tiw_min_ptr = NULL;
    		    tiw_min = 0;
    		}
    
    		/* Remove from list */
    		remove_timer(p);//移除消息
    		*timeout_tail = p;	/* Insert in timeout queue */ //头插法构成超时链表
    		timeout_tail = &p->next;
    	    }
    	    else {
    		/* no timeout, just decrease counter */
    		p->count -= count;//更新轮数
    		prev = &p->next;//下一条timer
    	    }
    	}
    	tiw_pos = (tiw_pos + 1) % TIW_SIZE;//下一毫秒插槽(刻度)
    	dtime--;//毫秒自减
        }
        tiw_pos = keep_pos;
        if (tiw_min_ptr)
    	tiw_min -= dt;
    
        erts_smp_mtx_unlock(&tiw_lock);
    
        /* Call timedout timers callbacks */
        while (timeout_head) {
    	p = timeout_head;
    	timeout_head = p->next;
    	/* Here comes hairy use of the timer fields!
    	 * They are reset without having the lock.
    	 * It is assumed that no code but this will
    	 * accesses any field until the ->timeout
    	 * callback is called.
    	 */
    	p->next = NULL;
    	p->prev = NULL;
    	p->slot = 0;
    	(*p->timeout)(p->arg);
        }
    }
    

    一些注意点

    1. 一个时间轮的刻度大概是66秒,所以一个进程经过66秒就会遍历一次所有timer。
    2. 对于未来长时间的消息,在多次调用bump_timer_internal,会对消息进行多次遍历。
    3. 发送时使用头插法插入消息,超时时使用队列,插入到队列尾部,所以会造成消息逆序。
    4. 每一个虚拟机一个时间轮。(代码中可以看到初始化调用是在你erl_start函数)

    疑问的地方

    1. 目前看代码,代码调用bump_timer_internal是在scheduler_wait和scheduler函数。那么为何只在这两个函数进行调用?如果时间过长就不能保证send_after的消息准时到达。
  • 相关阅读:
    Flutter form 的表单 input
    FloatingActionButton 实现类似 闲鱼 App 底部导航凸起按钮
    Flutter 中的常见的按钮组件 以及自 定义按钮组件
    Drawer 侧边栏、以及侧边栏内 容布局
    AppBar 自定义顶部导航按钮 图标、颜色 以及 TabBar 定义顶部 Tab 切换 通过TabController 定义TabBar
    清空路由 路由替换 返回到根路由
    应对ubuntu linux图形界面卡住的方法
    [转] 一块赚零花钱
    [转]在树莓派上搭建LAMP服务
    ssh保持连接
  • 原文地址:https://www.cnblogs.com/quitboy/p/4499687.html
Copyright © 2011-2022 走看看