zoukankan      html  css  js  c++  java
  • redis学习笔记(五): serverCron

    serverCron是redis里主要的定时处理函数,在initServer中通过调用aeCreateTimeEvent,将serverCron做为callback注册到全局的eventLoop结构当中。它在主循环中的位置:

    aeMain
    {
        while (!stop)
        {
            beforeSleep
            aeApiPoll
            process file events
            /* process time events */
            for each timeEntry(te) in eventLoop
            {
                retval = te->timeProc()    /* 这里面timeProc就是serverCron */
                if (NOMORE == retval)
                    Delete time entry from eventLoop
                else
                    aeAddMillisecondsToNow(te, retval) /* te下一次触发的时隔更新为retval */
            }
        }
    }

    看serverCron的实现之前先看这个run_with_period的定义:

    #define run_with_period(_ms_) 
      if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

    由它的定义,run_with_period(_ms_)会在两种情况下返回1:

    1. _ms_ <= 1000/server.hz,就是说_ms_比serverCron的执行间隔要小。

    2. 或者_ms_比serverCron的执行间隔要大并且serverCron执行的次数刚好是_ms_/(1000/server.hz)的整数倍。

    server.hz的意义是serverCron在一秒内执行的次数(从redis的实现来看,这个值是以ms为最小单位来计算的),那么1000/server.hz就是serverCron的执行间隔(ms),再结合run_with_period的定义可以看出,run_with_period表示每_ms_毫秒执行一段任务。

    举个例子来说,server.hz是100,也就是servreCron的执行间隔是10ms(可能不完全精确,毕竟是单线程顺序执行)。

    假如有一些任务需要每500ms执行一次,就可以在serverCron中用run_with_period(500)把每500ms需要执行一次的工作控制起来。所以,serverCron每执行到第500/10次,run_with_period(500)就会返回1

    serverCron的实现如下:

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j;
        REDIS_NOTUSED(eventLoop);
        REDIS_NOTUSED(id);
        REDIS_NOTUSED(clientData);
    
        /* Software watchdog: deliver the SIGALRM that will reach the signal
         * handler if we don't return here fast enough. */
        /* 用SIGALRM信号触发watchdog的处理过程,具体的函数为watchdogSignalHandler */
        if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
    
        /* Update the time cache. */
        /* 更新server.unixtime和server.mstime */
        updateCachedTime();
    
        /* 每100ms更新一次统计量,包括这段时间内的commands, net_input_bytes, net_output_bytes */
        run_with_period(100) {
            trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands);
            trackInstantaneousMetric(REDIS_METRIC_NET_INPUT,
                    server.stat_net_input_bytes);
            trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT,
                    server.stat_net_output_bytes);
        }
    
        /* We have just REDIS_LRU_BITS bits per object for LRU information.
         * So we use an (eventually wrapping) LRU clock.
         *
         * Note that even if the counter wraps it's not a big problem,
         * everything will still work but some object will appear younger
         * to Redis. However for this to happen a given object should never be
         * touched for all the time needed to the counter to wrap, which is
         * not likely.
         *
         * Note that you can change the resolution altering the
         * REDIS_LRU_CLOCK_RESOLUTION define. */
         /* 根据server.lruclock的定义,getLRUClock返回的是当前时间换算成秒数的低23位 */
        server.lruclock = getLRUClock();
    
        /* Record the max memory used since the server was started. */
        /* 记录最大内存使用情况 */
        if (zmalloc_used_memory() > server.stat_peak_memory)
            server.stat_peak_memory = zmalloc_used_memory();
    
        /* Sample the RSS here since this is a relatively slow call. */
        /* 记录当前的RSS值 */
        server.resident_set_size = zmalloc_get_rss();
    
        /* We received a SIGTERM, shutting down here in a safe way, as it is
         * not ok doing so inside the signal handler. */
        /* 如果收到了SIGTERM信号,尝试退出 */
        if (server.shutdown_asap) {
            if (prepareForShutdown(0) == REDIS_OK) exit(0);
            redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
            server.shutdown_asap = 0;
        }
    
        /* Show some info about non-empty databases */
        /* 每5秒输出一次非空databases的信息到log当中 */
        run_with_period(5000) {
            for (j = 0; j < server.dbnum; j++) {
                long long size, used, vkeys;
    
                size = dictSlots(server.db[j].dict);
                used = dictSize(server.db[j].dict);
                vkeys = dictSize(server.db[j].expires);
                if (used || vkeys) {
                    redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                    /* dictPrintStats(server.dict); */
                }
            }
        }
    
        /* Show information about connected clients */
        /* 如果不是sentinel模式,则每5秒输出一个connected的client的信息到log */
        if (!server.sentinel_mode) {
            run_with_period(5000) {
                redisLog(REDIS_VERBOSE,
                    "%lu clients connected (%lu slaves), %zu bytes in use",
                    listLength(server.clients)-listLength(server.slaves),
                    listLength(server.slaves),
                    zmalloc_used_memory());
            }
        }
    
        /* We need to do a few operations on clients asynchronously. */
        /* 清理空闲的客户端或者释放query buffer中未被使用的空间 */
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        /* databases的处理,rehash就在这里 */
        databasesCron();
    
        /* Start a scheduled AOF rewrite if this was requested by the user while
         * a BGSAVE was in progress. */
        /* 如果开启了aof_rewrite的调度并且当前没有在background执行rdb/aof的操作,则进行background的aof操作 */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
            server.aof_rewrite_scheduled)
        {
            rewriteAppendOnlyFileBackground();
        }
    
        /* Check if a background saving or AOF rewrite in progress terminated. */    
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
            /* 如果有aof或者rdb在后台进行,则等待对应的退出。注意,这里用了WNOHANG,所以不会阻塞在wait3 */
            int statloc;
            pid_t pid;
    
            /* wait3返回非0值,要么是子进程退出,要么是出错 */
            if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
                int exitcode = WEXITSTATUS(statloc);
                int bysignal = 0;
    
                if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
    
                /* 如果是出错,在log中记录这次错误
                 * 如果是rdb任务退出,调用backgroundSaveDoneHandler进行收尾工作
                 * 如果是aof任务退出,调用backgroundRewriteDoneHandler进行收尾工作
                 */
                if (pid == -1) {
                    redisLog(LOG_WARNING,"wait3() returned an error: %s. "
                        "rdb_child_pid = %d, aof_child_pid = %d",
                        strerror(errno),
                        (int) server.rdb_child_pid,
                        (int) server.aof_child_pid);
                } else if (pid == server.rdb_child_pid) {
                    backgroundSaveDoneHandler(exitcode,bysignal);
                } else if (pid == server.aof_child_pid) {
                    backgroundRewriteDoneHandler(exitcode,bysignal);
                } else {
                    redisLog(REDIS_WARNING,
                        "Warning, detected child with unmatched pid: %ld",
                        (long)pid);
                }
                /* 如果当前有rdb/aof任务在处理,则将dict_can_resize设置为0(表示不允许进行resize),否则,设置为1 */
                updateDictResizePolicy();
            }
        } else {
            /* 当前没有rdb/aof任务在执行,这里来判断是否要开启新的rdb/aof任务 */
            /* If there is not a background saving/rewrite in progress check if
             * we have to save/rewrite now */
             for (j = 0; j < server.saveparamslen; j++) {
                struct saveparam *sp = server.saveparams+j;
    
                /* Save if we reached the given amount of changes,
                 * the given amount of seconds, and if the latest bgsave was
                 * successful or if, in case of an error, at least
                 * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
                if (server.dirty >= sp->changes &&
                    server.unixtime-server.lastsave > sp->seconds &&
                    (server.unixtime-server.lastbgsave_try >
                     REDIS_BGSAVE_RETRY_DELAY ||
                     server.lastbgsave_status == REDIS_OK))
                {
                    redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, (int)sp->seconds);
                    rdbSaveBackground(server.rdb_filename);
                    break;
                }
             }
    
             /* Trigger an AOF rewrite if needed */
             if (server.rdb_child_pid == -1 &&
                 server.aof_child_pid == -1 &&
                 server.aof_rewrite_perc &&
                 server.aof_current_size > server.aof_rewrite_min_size)
             {
                long long base = server.aof_rewrite_base_size ?
                                server.aof_rewrite_base_size : 1;
                long long growth = (server.aof_current_size*100/base) - 100;
                if (growth >= server.aof_rewrite_perc) {
                    redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                    rewriteAppendOnlyFileBackground();
                }
             }
        }
    
    
        /* AOF postponed flush: Try at every cron cycle if the slow fsync
         * completed. */
        /* 如果开启了aof_flush_postponed_start,则在每次serverCron流程里都将server.aof_buf写入磁盘文件。
         * PS, server.aof_buf是从上一次写aof文件到目前为止所执行过的命令集合,所以是append only file
         */
        if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
        /* AOF write errors: in this case we have a buffer to flush as well and
         * clear the AOF error in case of success to make the DB writable again,
         * however to try every second is enough in case of 'hz' is set to
         * an higher frequency. */
        /* 每一秒检查一次上一轮aof的写入是否发生了错误,如果有错误则尝试重新写一次 */
        run_with_period(1000) {
            if (server.aof_last_write_status == REDIS_ERR)
                flushAppendOnlyFile(0);
        }
    
        /* Close clients that need to be closed asynchronous */
        /* server.clients_to_close链表上的元素都是待关闭的连接 */
        freeClientsInAsyncFreeQueue();
    
        /* Clear the paused clients flag if needed. */
        /* clients被paused时,会相应地记录一个超时的时间,如果那个时间已经到来,则给client打上REDIS_UNBLOCKED标记(slave的client不处理),并加到server.unblocked_clients上 */
        clientsArePaused(); /* Don't check return value, just use the side effect. */
    
        /* Replication cron function -- used to reconnect to master and
         * to detect transfer failures. */
        /* 每1秒执行一次replication */
        run_with_period(1000) replicationCron();
    
        /* Run the Redis Cluster cron. */
        /* 每100ms执行一次clusterCron */
        run_with_period(100) {
            if (server.cluster_enabled) clusterCron();
        }
    
        /* Run the Sentinel timer if we are in sentinel mode. */
        /* 每100ms执行一次sentine的定时器 */
        run_with_period(100) {
            if (server.sentinel_mode) sentinelTimer();
        }
    
        /* Cleanup expired MIGRATE cached sockets. */
        /* 每1秒清理一次server.migrate_cached_sockets链表上的超时sockets */
        run_with_period(1000) {
            migrateCloseTimedoutSockets();
        }
    
        /* serverCron执行次数 */
        server.cronloops++;
        /* 返回下一次执行serverCron的间隔 */
        return 1000/server.hz;
    }
  • 相关阅读:
    Linux 系统的启动过程
    Oracle中row_number()、rank()、dense_rank() 的区别
    Java 动态打印菱形代码之for循环的使用
    Oracle 体系结构chapter2
    Oracle 11g 概述 chaper1
    go解决ctrl+鼠标左键或F12失效问题
    解决unrecognized import path "golang.org/x/sys/windows"问题
    设计规范
    性能分析
    用IDEA导入项目时,项目中的SpringBoot注解无法识别
  • 原文地址:https://www.cnblogs.com/flypighhblog/p/7757996.html
Copyright © 2011-2022 走看看