serverCron是redis里主要的定时处理函数,在initServer中通过调用aeCreateTimeEvent,将serverCron做为callback注册到全局的eventLoop结构当中。它在主循环中的位置:
aeMain { while (!stop) { beforeSleep aeApiPoll process file events /* process time events */ for each timeEntry(te) in eventLoop { retval = te->timeProc() /* 这里面timeProc就是serverCron */ if (NOMORE == retval) Delete time entry from eventLoop else aeAddMillisecondsToNow(te, retval) /* te下一次触发的时隔更新为retval */ } } }
看serverCron的实现之前先看这个run_with_period的定义:
#define run_with_period(_ms_)
if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
由它的定义,run_with_period(_ms_)会在两种情况下返回1:
1. _ms_ <= 1000/server.hz,就是说_ms_比serverCron的执行间隔要小。
2. 或者_ms_比serverCron的执行间隔要大并且serverCron执行的次数刚好是_ms_/(1000/server.hz)的整数倍。
server.hz的意义是serverCron在一秒内执行的次数(从redis的实现来看,这个值是以ms为最小单位来计算的),那么1000/server.hz就是serverCron的执行间隔(ms),再结合run_with_period的定义可以看出,run_with_period表示每_ms_毫秒执行一段任务。
举个例子来说,server.hz是100,也就是servreCron的执行间隔是10ms(可能不完全精确,毕竟是单线程顺序执行)。
假如有一些任务需要每500ms执行一次,就可以在serverCron中用run_with_period(500)把每500ms需要执行一次的工作控制起来。所以,serverCron每执行到第500/10次,run_with_period(500)就会返回1
serverCron的实现如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ /* 用SIGALRM信号触发watchdog的处理过程,具体的函数为watchdogSignalHandler */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ /* 更新server.unixtime和server.mstime */ updateCachedTime(); /* 每100ms更新一次统计量,包括这段时间内的commands, net_input_bytes, net_output_bytes */ run_with_period(100) { trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(REDIS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); } /* We have just REDIS_LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * REDIS_LRU_CLOCK_RESOLUTION define. */ /* 根据server.lruclock的定义,getLRUClock返回的是当前时间换算成秒数的低23位 */ server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ /* 记录最大内存使用情况 */ if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory(); /* Sample the RSS here since this is a relatively slow call. */ /* 记录当前的RSS值 */ server.resident_set_size = zmalloc_get_rss(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ /* 如果收到了SIGTERM信号,尝试退出 */ if (server.shutdown_asap) { if (prepareForShutdown(0) == REDIS_OK) exit(0); redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; } /* Show some info about non-empty databases */ /* 每5秒输出一次非空databases的信息到log当中 */ run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } /* Show information about connected clients */ /* 如果不是sentinel模式,则每5秒输出一个connected的client的信息到log */ if (!server.sentinel_mode) { run_with_period(5000) { redisLog(REDIS_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } /* We need to do a few operations on clients asynchronously. */ /* 清理空闲的客户端或者释放query buffer中未被使用的空间 */ clientsCron(); /* Handle background operations on Redis databases. */ /* databases的处理,rehash就在这里 */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ /* 如果开启了aof_rewrite的调度并且当前没有在background执行rdb/aof的操作,则进行background的aof操作 */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { /* 如果有aof或者rdb在后台进行,则等待对应的退出。注意,这里用了WNOHANG,所以不会阻塞在wait3 */ int statloc; pid_t pid; /* wait3返回非0值,要么是子进程退出,要么是出错 */ if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); /* 如果是出错,在log中记录这次错误 * 如果是rdb任务退出,调用backgroundSaveDoneHandler进行收尾工作 * 如果是aof任务退出,调用backgroundRewriteDoneHandler进行收尾工作 */ if (pid == -1) { redisLog(LOG_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); } else { redisLog(REDIS_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } /* 如果当前有rdb/aof任务在处理,则将dict_can_resize设置为0(表示不允许进行resize),否则,设置为1 */ updateDictResizePolicy(); } } else { /* 当前没有rdb/aof任务在执行,这里来判断是否要开启新的rdb/aof任务 */ /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > REDIS_BGSAVE_RETRY_DELAY || server.lastbgsave_status == REDIS_OK)) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveBackground(server.rdb_filename); break; } } /* Trigger an AOF rewrite if needed */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ /* 如果开启了aof_flush_postponed_start,则在每次serverCron流程里都将server.aof_buf写入磁盘文件。 * PS, server.aof_buf是从上一次写aof文件到目前为止所执行过的命令集合,所以是append only file */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ /* 每一秒检查一次上一轮aof的写入是否发生了错误,如果有错误则尝试重新写一次 */ run_with_period(1000) { if (server.aof_last_write_status == REDIS_ERR) flushAppendOnlyFile(0); } /* Close clients that need to be closed asynchronous */ /* server.clients_to_close链表上的元素都是待关闭的连接 */ freeClientsInAsyncFreeQueue(); /* Clear the paused clients flag if needed. */ /* clients被paused时,会相应地记录一个超时的时间,如果那个时间已经到来,则给client打上REDIS_UNBLOCKED标记(slave的client不处理),并加到server.unblocked_clients上 */ clientsArePaused(); /* Don't check return value, just use the side effect. */ /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ /* 每1秒执行一次replication */ run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ /* 每100ms执行一次clusterCron */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ /* 每100ms执行一次sentine的定时器 */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } /* Cleanup expired MIGRATE cached sockets. */ /* 每1秒清理一次server.migrate_cached_sockets链表上的超时sockets */ run_with_period(1000) { migrateCloseTimedoutSockets(); } /* serverCron执行次数 */ server.cronloops++; /* 返回下一次执行serverCron的间隔 */ return 1000/server.hz; }