zoukankan      html  css  js  c++  java
  • Redis学习--AOF日志刷盘操作代码

    代码学习

    在Redis处理命令过程中,会调用propagatepropagateExpire来将命令写入AOF日志文件feedAppendOnlyFile和传播给从库replicationFeedSlaves

    函数feedAppendOnlyFile只负责将aof日志写入到操作系统缓存,由函数flushAppendOnlyFile来判处是否将aof日志刷新到物理存储中。

    参数appendfsync用来控制aof日志的算盘策略:

    • appendfsync = always:每次写命令执行后都进行刷盘。
    • appendfsync = everysec:每秒进行一次刷盘。
    • appendfsync = no:由操作系统控制刷盘。

    函数beforsleepserverCron都会调用函数flushAppendOnlyFile来尝试刷盘:

    • 函数beforsleep由每次文件事件循环结束前调用。

    • 函数serverCron由时间事件来调用。

    虽然函数beforsleep是在文件事件处理前调用,但由于eventLoop的循环调用,当执行完aeProcessEvents后,随着下一次循环会立即执行到函数beforsleep,保证当appendfsync = always时,每次写命令操作的aof日志都刷盘。

    参数no-appendfsync-on-rewrite同样影响aof刷盘机制,当no-appendfsync-on-rewrite=on时,如果redis实例正在进行aof日志重写或rdb后台备份操作,则无论参数appendfsync如何设置都不进行刷盘操作,避免因为刷盘操作导致主线程阻塞,建议开启参数。

    参数appendfsync = everysec 且no-appendfsync-on-rewrite = off时,采用异步方式刷盘,每次刷盘前会检查上一秒的刷盘是否完成,如果上一次刷盘未完成,则会阻塞当前刷盘操作,如果此时发生实例故障,理论上会丢失最近2秒的AOF日志。

    函数flushAppendOnlyFile

    /* Write the append only file buffer on disk.
     *
     * Since we are required to write the AOF before replying to the client,
     * and the only way the client socket can get a write is entering when the
     * the event loop, we accumulate all the AOF writes in a memory
     * buffer and write it on disk using this function just before entering
     * the event loop again.
     *
     * About the 'force' argument:
     *
     * When the fsync policy is set to 'everysec' we may delay the flush if there
     * is still an fsync() going on in the background thread, since for instance
     * on Linux write(2) will be blocked by the background fsync anyway.
     * When this happens we remember that there is some aof buffer to be
     * flushed ASAP, and will try to do that in the serverCron() function.
     *
     * However if force is set to 1 we'll write regardless of the background
     * fsync. */
    #define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
    void flushAppendOnlyFile(int force) {
        ssize_t nwritten;
        int sync_in_progress = 0;
        mstime_t latency;
    
        if (sdslen(server.aof_buf) == 0) {
            /* Check if we need to do fsync even the aof buffer is empty,
             * because previously in AOF_FSYNC_EVERYSEC mode, fsync is
             * called only when aof buffer is not empty, so if users
             * stop write commands before fsync called in one second,
             * the data in page cache cannot be flushed in time. */
            if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.aof_fsync_offset != server.aof_current_size &&
                server.unixtime > server.aof_last_fsync &&
                !(sync_in_progress = aofFsyncInProgress())) {
                goto try_fsync;
            } else {
                return;
            }
        }
    
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
            sync_in_progress = aofFsyncInProgress();
    
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
            /* With this append fsync policy we do background fsyncing.
             * If the fsync is still in progress we can try to delay
             * the write for a couple of seconds. */
            if (sync_in_progress) {
                if (server.aof_flush_postponed_start == 0) {
                    /* No previous write postponing, remember that we are
                     * postponing the flush and return. */
                    server.aof_flush_postponed_start = server.unixtime;
                    return;
                } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                    /* We were already waiting for fsync to finish, but for less
                     * than two seconds this is still ok. Postpone again. */
                    return;
                }
                /* Otherwise fall trough, and go write since we can't wait
                 * over two seconds. */
                server.aof_delayed_fsync++;
                serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
            }
        }
        /* We want to perform a single write. This should be guaranteed atomic
         * at least if the filesystem we are writing is a real physical one.
         * While this will save us against the server being killed I don't think
         * there is much to do about the whole server stopping for power problems
         * or alike */
    
        latencyStartMonitor(latency);
        nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
        latencyEndMonitor(latency);
        /* We want to capture different events for delayed writes:
         * when the delay happens with a pending fsync, or with a saving child
         * active, and when the above two conditions are missing.
         * We also use an additional event name to save all samples which is
         * useful for graphing / monitoring purposes. */
        if (sync_in_progress) {
            latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
        } else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
            latencyAddSampleIfNeeded("aof-write-active-child",latency);
        } else {
            latencyAddSampleIfNeeded("aof-write-alone",latency);
        }
        latencyAddSampleIfNeeded("aof-write",latency);
    
        /* We performed the write so reset the postponed flush sentinel to zero. */
        server.aof_flush_postponed_start = 0;
    
        if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
            static time_t last_write_error_log = 0;
            int can_log = 0;
    
            /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
            if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
                can_log = 1;
                last_write_error_log = server.unixtime;
            }
    
            /* Log the AOF write error and record the error code. */
            if (nwritten == -1) {
                if (can_log) {
                    serverLog(LL_WARNING,"Error writing to the AOF file: %s",
                        strerror(errno));
                    server.aof_last_write_errno = errno;
                }
            } else {
                if (can_log) {
                    serverLog(LL_WARNING,"Short write while writing to "
                                           "the AOF file: (nwritten=%lld, "
                                           "expected=%lld)",
                                           (long long)nwritten,
                                           (long long)sdslen(server.aof_buf));
                }
    
                if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
                    if (can_log) {
                        serverLog(LL_WARNING, "Could not remove short write "
                                 "from the append-only file.  Redis may refuse "
                                 "to load the AOF the next time it starts.  "
                                 "ftruncate: %s", strerror(errno));
                    }
                } else {
                    /* If the ftruncate() succeeded we can set nwritten to
                     * -1 since there is no longer partial data into the AOF. */
                    nwritten = -1;
                }
                server.aof_last_write_errno = ENOSPC;
            }
    
            /* Handle the AOF write error. */
            if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
                /* We can't recover when the fsync policy is ALWAYS since the
                 * reply for the client is already in the output buffers, and we
                 * have the contract with the user that on acknowledged write data
                 * is synced on disk. */
                serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
                exit(1);
            } else {
                /* Recover from failed write leaving data into the buffer. However
                 * set an error to stop accepting writes as long as the error
                 * condition is not cleared. */
                server.aof_last_write_status = C_ERR;
    
                /* Trim the sds buffer if there was a partial write, and there
                 * was no way to undo it with ftruncate(2). */
                if (nwritten > 0) {
                    server.aof_current_size += nwritten;
                    sdsrange(server.aof_buf,nwritten,-1);
                }
                return; /* We'll try again on the next call... */
            }
        } else {
            /* Successful write(2). If AOF was in error state, restore the
             * OK state and log the event. */
            if (server.aof_last_write_status == C_ERR) {
                serverLog(LL_WARNING,
                    "AOF write error looks solved, Redis can write again.");
                server.aof_last_write_status = C_OK;
            }
        }
        server.aof_current_size += nwritten;
    
        /* Re-use AOF buffer when it is small enough. The maximum comes from the
         * arena size of 4k minus some overhead (but is otherwise arbitrary). */
        if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
            sdsclear(server.aof_buf);
        } else {
            sdsfree(server.aof_buf);
            server.aof_buf = sdsempty();
        }
    
    try_fsync:
        /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
         * children doing I/O in the background. */
        if (server.aof_no_fsync_on_rewrite &&
            (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
                return;
    
        /* Perform the fsync if needed. */
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* redis_fsync is defined as fdatasync() for Linux in order to avoid
             * flushing metadata. */
            latencyStartMonitor(latency);
            redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
            latencyEndMonitor(latency);
            latencyAddSampleIfNeeded("aof-fsync-always",latency);
            server.aof_fsync_offset = server.aof_current_size;
            server.aof_last_fsync = server.unixtime;
        } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                    server.unixtime > server.aof_last_fsync)) {
            if (!sync_in_progress) {
                aof_background_fsync(server.aof_fd);
                server.aof_fsync_offset = server.aof_current_size;
            }
            server.aof_last_fsync = server.unixtime;
        }
    }
    

    函数beforsleep和aftersleep

    /* This function gets called every time Redis is entering the
     * main loop of the event driven library, that is, before to sleep
     * for ready file descriptors. */
    void beforeSleep(struct aeEventLoop *eventLoop) {
        UNUSED(eventLoop);
    
        /* Call the Redis Cluster before sleep function. Note that this function
         * may change the state of Redis Cluster (from ok to fail or vice versa),
         * so it's a good idea to call it before serving the unblocked clients
         * later in this function. */
        if (server.cluster_enabled) clusterBeforeSleep();
    
        /* Run a fast expire cycle (the called function will return
         * ASAP if a fast cycle is not needed). */
        if (server.active_expire_enabled && server.masterhost == NULL)
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    
        /* Send all the slaves an ACK request if at least one client blocked
         * during the previous event loop iteration. */
        if (server.get_ack_from_slaves) {
            robj *argv[3];
    
            argv[0] = createStringObject("REPLCONF",8);
            argv[1] = createStringObject("GETACK",6);
            argv[2] = createStringObject("*",1); /* Not used argument. */
            replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
            decrRefCount(argv[0]);
            decrRefCount(argv[1]);
            decrRefCount(argv[2]);
            server.get_ack_from_slaves = 0;
        }
    
        /* Unblock all the clients blocked for synchronous replication
         * in WAIT. */
        if (listLength(server.clients_waiting_acks))
            processClientsWaitingReplicas();
    
        /* Check if there are clients unblocked by modules that implement
         * blocking commands. */
        moduleHandleBlockedClients();
    
        /* Try to process pending commands for clients that were just unblocked. */
        if (listLength(server.unblocked_clients))
            processUnblockedClients();
    
        /* Write the AOF buffer on disk */
        flushAppendOnlyFile(0);
    
        /* Handle writes with pending output buffers. */
        handleClientsWithPendingWrites();
    
        /* Before we are going to sleep, let the threads access the dataset by
         * releasing the GIL. Redis main thread will not touch anything at this
         * time. */
        if (moduleCount()) moduleReleaseGIL();
    }
    
    /* This function is called immadiately after the event loop multiplexing
     * API returned, and the control is going to soon return to Redis by invoking
     * the different events callbacks. */
    void afterSleep(struct aeEventLoop *eventLoop) {
        UNUSED(eventLoop);
        if (moduleCount()) moduleAcquireGIL();
    }
    

    在main方法中将方法beforeSleep和afterSleep绑定到循环事件处理中:

    int main(int argc, char **argv) {
        aeSetBeforeSleepProc(server.el,beforeSleep);
        aeSetAfterSleepProc(server.el,afterSleep);
        aeMain(server.el);
        aeDeleteEventLoop(server.el);
    }
    
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    
    

    函数serverCron

    /* This is our timer interrupt, called server.hz times per second.
     * Here is where we do a number of things that need to be done asynchronously.
     * For instance:
     *
     * - Active expired keys collection (it is also performed in a lazy way on
     *   lookup).
     * - Software watchdog.
     * - Update some statistic.
     * - Incremental rehashing of the DBs hash tables.
     * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
     * - Clients timeout of different kinds.
     * - Replication reconnection.
     * - Many more...
     *
     * Everything directly called here will be called server.hz times per second,
     * so in order to throttle execution of things we want to do less frequently
     * a macro is used: run_with_period(milliseconds) { .... }
     */
    
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        int j;
        UNUSED(eventLoop);
        UNUSED(id);
        UNUSED(clientData);
    
    
        /* AOF postponed flush: Try at every cron cycle if the slow fsync
         * completed. */
        if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    
        /* AOF write errors: in this case we have a buffer to flush as well and
         * clear the AOF error in case of success to make the DB writable again,
         * however to try every second is enough in case of 'hz' is set to
         * an higher frequency. */
        run_with_period(1000) {
            if (server.aof_last_write_status == C_ERR)
                flushAppendOnlyFile(0);
        }
        server.cronloops++;
        return 1000/server.hz;
    }
    

    参考资料

    redis源码浅析--AOF(append only file)持久化

  • 相关阅读:
    poj 3666 Making the Grade
    poj 3186 Treats for the Cows (区间dp)
    hdu 1074 Doing Homework(状压)
    CodeForces 489C Given Length and Sum of Digits...
    CodeForces 163A Substring and Subsequence
    CodeForces 366C Dima and Salad
    CodeForces 180C Letter
    CodeForces
    hdu 2859 Phalanx
    socket接收大数据流
  • 原文地址:https://www.cnblogs.com/gaogao67/p/15085166.html
Copyright © 2011-2022 走看看