zoukankan      html  css  js  c++  java
  • erlang的send_after的时间管理

    send_after时间轮管理

     愚昧之人追赶时间之轮,聪慧之人乘坐时间之轮。 ----无名氏
    
    send_after时间管理

    send_after可以实现在多少秒后发送一条消息给某个进程,但是这个erlang是如何判断时间到了呢?

    在erlang的实现time.c文件中可以看到erlang是通过一个时间轮,可以想象成一个带有刻度的环形尺子。send_after通过不同的时间挂载到不同的刻度下,由于时间的总刻度是一定的,所以会有不同的时间挂载到相同的时间刻度下,比如总刻度为8,那么1跟9必然在相同刻度下,因为9%8==1,所以每一个挂载在刻度下的时间有一个count计数此时间是第几轮的时间,如果1是第一轮的话,9就是第二轮。当erlang虚拟机调度进程时就会处理时间轮结构,看看有没有到期的时间。
    具体判断规则为:拿出上次记录的时间到这次时间中间的所有消息,对相应的时间进行判断,若时间的轮数小于当前轮数,则说明此时间已经到了,则把相应的消息发送到相应的进程。
    如下图所示:

    具体实现(time.c)

    /*
    ** Timer entry:
    */
    //timer的结构
    typedef struct erl_timer {
        struct erl_timer* next;	/* next entry tiw slot or chain */
        struct erl_timer* prev;	/* prev entry tiw slot or chain */
        Uint slot;			/* slot in timer wheel */
        Uint count;			/* number of loops remaining */
        int    active;		/* 1=activated, 0=deactivated */
        /* called when timeout */
        //当时间到的时候,会调用相应的timeout函数发送消息到对应的进程
        void (*timeout)(void*);
        /* called when cancel (may be NULL) */
        void (*cancel)(void*);
        void* arg;        /* argument to timeout/cancel procs */
    } ErlTimer;
    
    static void
    insert_timer(ErlTimer* p, Uint t)
    {
        Uint tm;
        Uint64 ticks;
    
        /* The current slot (tiw_pos) in timing wheel is the next slot to be
         * be processed. Hence no extra time tick is needed.
         *
         * (x + y - 1)/y is precisely the "number of bins" formula.
         */
        ticks = (t + (TIW_ITIME - 1)) / TIW_ITIME;
    
        /*
         * Ticks must be a Uint64, or the addition may overflow here,
         * resulting in an incorrect value for p->count below.
         */
        ticks += do_time_update(); /* Add backlog of unprocessed time */
    
        /* calculate slot */
        tm = (ticks + tiw_pos) % TIW_SIZE; //计算刻度
        p->slot = (Uint) tm;
        p->count = (Uint) (ticks / TIW_SIZE);//计算第几轮
    
        /* insert at head of list at slot */
        //头插法,插入链表
        p->next = tiw[tm];
        p->prev = NULL;
        if (p->next != NULL)
    	p->next->prev = p;
        tiw[tm] = p;
    
    
        /* insert min time */
        if ((tiw_nto == 0) || ((tiw_min_ptr != NULL) && (ticks < tiw_min))) {
    	tiw_min = ticks;//记录最小的秒数
    	tiw_min_ptr = p;
        }
        if ((tiw_min_ptr == p) && (ticks > tiw_min)) {
    	/* some other timer might be 'min' now */
    	tiw_min = 0;
    	tiw_min_ptr = NULL;
        }
    
        tiw_nto++;//时间的timer个数
    }
    
    //每个刻度(插槽)下的timer是一个双链表的结构
    static void remove_timer(ErlTimer *p) {
        /* first */
        if (!p->prev) {
    	tiw[p->slot] = p->next;
    	if(p->next)
    	    p->next->prev = NULL;
        } else {
    	p->prev->next = p->next;
        }
    
        /* last */
        if (!p->next) {
    	if (p->prev)
    	    p->prev->next = NULL;
        } else {
    	p->next->prev = p->prev;
        }
    
        p->next = NULL;
        p->prev = NULL;
        /* Make sure cancel callback isn't called */
        p->active = 0;
        tiw_nto--;
    }
    
    void
    erts_init_time(void)
    {
        int i, itime;
    
        /* system dependent init; must be done before do_time_init()
           if timer thread is enabled */
        itime = erts_init_time_sup();
    #ifdef TIW_ITIME_IS_CONSTANT
        if (itime != TIW_ITIME) {
    	erl_exit(ERTS_ABORT_EXIT, "timer resolution mismatch %d != %d", itime, TIW_ITIME);
        }
    #else
        tiw_itime = itime;
    #endif
    
        erts_smp_mtx_init(&tiw_lock, "timer_wheel");
    	//从这里可以看到一个事件轮是一个ErlTimer* 类型的数组 数组大小是65536(一般情况下)
        tiw = (ErlTimer**) erts_alloc(ERTS_ALC_T_TIMER_WHEEL,
    				  TIW_SIZE * sizeof(ErlTimer*));
        for(i = 0; i < TIW_SIZE; i++)
    	tiw[i] = NULL;
        do_time_init();
        tiw_pos = tiw_nto = 0;
        tiw_min_ptr = NULL;
        tiw_min = 0;
    }
    
    static ERTS_INLINE void bump_timer_internal(erts_short_time_t dt) /* PRE: tiw_lock is write-locked */
    {
        Uint keep_pos;
        Uint count;
        ErlTimer *p, **prev, *timeout_head, **timeout_tail;
        Uint dtime = (Uint) dt;
    
        /* no need to bump the position if there aren't any timeouts */
        if (tiw_nto == 0) {
    	erts_smp_mtx_unlock(&tiw_lock);
    	return;
        }
    
        /* if do_time > TIW_SIZE we want to go around just once */
        count = (Uint)(dtime / TIW_SIZE) + 1;//会遍历的轮数
        keep_pos = (tiw_pos + dtime) % TIW_SIZE;//
        if (dtime > TIW_SIZE) dtime = TIW_SIZE;//最多遍历一轮
    
        timeout_head = NULL;
        timeout_tail = &timeout_head;
        while (dtime > 0) {
    	/* this is to decrease the counters with the right amount */
    	/* when dtime >= TIW_SIZE */
    	if (tiw_pos == keep_pos) count--;//过了一轮count自减
    	prev = &tiw[tiw_pos];//拿到当前毫秒刻度的所有timer
    	while ((p = *prev) != NULL) {
    	    ASSERT( p != p->next);
    	    if (p->count < count) {     /* we have a timeout */ //time的轮数小于当前轮数,超时了。
    		/* remove min time */
    		if (tiw_min_ptr == p) {
    		    tiw_min_ptr = NULL;
    		    tiw_min = 0;
    		}
    
    		/* Remove from list */
    		remove_timer(p);//移除消息
    		*timeout_tail = p;	/* Insert in timeout queue */ //头插法构成超时链表
    		timeout_tail = &p->next;
    	    }
    	    else {
    		/* no timeout, just decrease counter */
    		p->count -= count;//更新轮数
    		prev = &p->next;//下一条timer
    	    }
    	}
    	tiw_pos = (tiw_pos + 1) % TIW_SIZE;//下一毫秒插槽(刻度)
    	dtime--;//毫秒自减
        }
        tiw_pos = keep_pos;
        if (tiw_min_ptr)
    	tiw_min -= dt;
    
        erts_smp_mtx_unlock(&tiw_lock);
    
        /* Call timedout timers callbacks */
        while (timeout_head) {
    	p = timeout_head;
    	timeout_head = p->next;
    	/* Here comes hairy use of the timer fields!
    	 * They are reset without having the lock.
    	 * It is assumed that no code but this will
    	 * accesses any field until the ->timeout
    	 * callback is called.
    	 */
    	p->next = NULL;
    	p->prev = NULL;
    	p->slot = 0;
    	(*p->timeout)(p->arg);
        }
    }
    

    一些注意点

    1. 一个时间轮的刻度大概是66秒,所以一个进程经过66秒就会遍历一次所有timer。
    2. 对于未来长时间的消息,在多次调用bump_timer_internal,会对消息进行多次遍历。
    3. 发送时使用头插法插入消息,超时时使用队列,插入到队列尾部,所以会造成消息逆序。
    4. 每一个虚拟机一个时间轮。(代码中可以看到初始化调用是在你erl_start函数)

    疑问的地方

    1. 目前看代码,代码调用bump_timer_internal是在scheduler_wait和scheduler函数。那么为何只在这两个函数进行调用?如果时间过长就不能保证send_after的消息准时到达。
  • 相关阅读:
    互联网协议入门
    【HTTP】图解HTTPS
    《计算机本科生理想的学习计划》
    VC++ TinyXML
    TinyXML 在vs2010 VC++使用
    Hadoop2.4.1入门实例:MaxTemperature
    xcode6
    Android利用广播监听设备网络连接(断网)的变化情况
    编程算法
    Google的Guava之IO升华
  • 原文地址:https://www.cnblogs.com/quitboy/p/4499687.html
Copyright © 2011-2022 走看看