zoukankan      html  css  js  c++  java
  • Redis源码分析主从复制(1)从服务器的同步

    Redis源码分析--主从复制

    一、主从复制:

    ​ Redis提供主从复制功能,可以让一个服务器(slave)去复制(replicate)另一个服务器(master)并作为主服务器的从服务器。从服务器保证一定程度上的与主服务器的数据一致性,从而水平扩展数据库的负载能力。

    命令 功能
    SLAVEOF host port 客户端连接的Redis服务器将成为指定地址的Redis服务器的从服务器;
    SLAVEOF no one 客户端连接的Redis服务器从从服务器升级为主服务器;
    PSYNC runid offset 从服务器向主服务器发送(即从服务器作为主服务器的client);如果主服务器返回+CONTINE,进行部分重同步,否则进行完整重同步;
    PSYNC ? -1 从服务器向主服务器发送(同上);进行完整重同步;

    ​ 分析一下slaveofCommand

    
    void slaveofCommand(redisClient *c) {
        /* 如果命令为:SLAVEOF no one,那么本机(从服务器)将提升为master */
        if (!strcasecmp(c->argv[1]->ptr,"no") &&
            !strcasecmp(c->argv[2]->ptr,"one")) {
            if (server.masterhost) {
                sdsfree(server.masterhost);
                server.masterhost = NULL;
                if (server.master) freeClient(server.master);
                replicationDiscardCachedMaster();
                cancelReplicationHandshake();
                server.repl_state = REDIS_REPL_NONE;
                redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
            }
        } else {
            long port;
    
            if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
                return;
    
            /* Check if we are already attached to the specified slave */
            /* 如果我们以及是指定master的slave,直接返回 */
            if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
                && server.masterport == port) {
                redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
                addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
                return;
            }
            /* There was no previous master or the user specified a different one,
             * we can continue. */
            sdsfree(server.masterhost);
            /* 设置masterhost和masterport */
            server.masterhost = sdsdup(c->argv[1]->ptr);
            server.masterport = port;
            if (server.master) freeClient(server.master);
            disconnectSlaves(); /* Force our slaves to resync with us as well. */
            replicationDiscardCachedMaster(); /* Don't try a PSYNC. */
            freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
            cancelReplicationHandshake();
            /* 设置标记位,与master建立连接 */
            server.repl_state = REDIS_REPL_CONNECT;
            redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
                server.masterhost, server.masterport);
        }
        addReply(c,shared.ok);
    }
    
    • L41:服务器接收到SLAVEOF命令后,并不直接与主服务器建立连接,而是设置好masterhostmasterport后,标记REDIS_REPL_CONNECT,要求在cronjob中与主服务器建立连接;

    二、与主服务器建立连接:

    ​ 步骤:

    1. 从服务器与主服务器建立TCP连接;
    2. 从服务器发送PING;
    3. 主服务器发送PONG;

    CronJob

    ​ serverCron()->replicationCron() (src/redis.c)

    void replicationCron(void) {
        // ...
         /* Check if we should connect to a MASTER */
        if (server.repl_state == REDIS_REPL_CONNECT) {
            redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d",
                server.masterhost, server.masterport);
            if (connectWithMaster() == REDIS_OK) {
                redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
            }
        }
        // ...
    }
    
    • L4:检查到 REDIS_REPL_CONNECT标志,尝试与主服务器建立连接;
    // src/replication.c
    
    int connectWithMaster(void) {
        int fd;
    
        /* 从服务器作为client,执行connect(2)连接到master */
        fd = anetTcpNonBlockBindConnect(NULL,
            server.masterhost,server.masterport,REDIS_BIND_ADDR);
        if (fd == -1) {
            redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
                strerror(errno));
            return REDIS_ERR;
        }
    
        /* 监听读写事件,设置事件处理回调函数为syncWithMaster */
        if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                AE_ERR)
        {
            close(fd);
            redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
            return REDIS_ERR;
        }
    
        /* 维护状态 */
        server.repl_transfer_lastio = server.unixtime;
        /* 这个是SLAVE->MASTER TCP连接套接字的fd,之后还会有SLAVE->MASTER执行完整重同步需要的tmpfile fd,需要注意区别 */
        server.repl_transfer_s = fd;   
        server.repl_state = REDIS_REPL_CONNECTING;
        return REDIS_OK;
    }
    
    • L7:注意这里使用的是非阻塞connect,可以看另一篇文章了解非阻塞connect。

      ​ connect成功后,从服务器会回调syncWithMaster,接下来看一下它的实现:

    void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
        char tmpfile[256], *err;
        int dfd, maxtries = 5;
        int sockerr = 0, psync_result;
        socklen_t errlen = sizeof(sockerr);
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(privdata);
        REDIS_NOTUSED(mask);
    
        /* If this event fired after the user turned the instance into a master
         * with SLAVEOF NO ONE we must just return ASAP. */
        if (server.repl_state == REDIS_REPL_NONE) {
            close(fd);
            return;
        }
    
        /* Check for errors in the socket. */
        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
            sockerr = errno;
        if (sockerr) {
            aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
            redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
                strerror(sockerr));
            goto error;
        }
    
        /* If we were connecting, it's time to send a non blocking PING, we want to
         * make sure the master is able to reply before going into the actual
         * replication process where we have long timeouts in the order of
         * seconds (in the meantime the slave would block). */
        /* 第一步:建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令
           因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步
         */
        if (server.repl_state == REDIS_REPL_CONNECTING) {
            redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
            /* Delete the writable event so that the readable event remains
             * registered and we can wait for the PONG reply. */
            /* 这一步之后WR事件就可以取消 */
            aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
            server.repl_state = REDIS_REPL_RECEIVE_PONG;
            /* Send the PING, don't check for errors at all, we have the timeout
             * that will take care about this. */
            syncWrite(fd,"PING\r\n",6,100);
            return;
        }
    
        /* Receive the PONG command. */
        if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
            char buf[1024];
    
            /* Delete the readable event, we no longer need it now that there is
             * the PING reply to read. */
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
    
            /* Read the reply with explicit timeout. */
            buf[0] = '\0';
            if (syncReadLine(fd,buf,sizeof(buf),
                server.repl_syncio_timeout*1000) == -1)
            {
                redisLog(REDIS_WARNING,
                    "I/O error reading PING reply from master: %s",
                    strerror(errno));
                goto error;
            }
    
            /* We accept only two replies as valid, a positive +PONG reply
             * (we just check for "+") or an authentication error.
             * Note that older versions of Redis replied with "operation not
             * permitted" instead of using a proper error code, so we test
             * both. */
            /* 第二步:收到主服务器回复PONG */
            if (buf[0] != '+' &&
                strncmp(buf,"-NOAUTH",7) != 0 &&
                strncmp(buf,"-ERR operation not permitted",28) != 0)
            {
                redisLog(REDIS_WARNING,"Error reply to PING from master: '%s'",buf);
                goto error;
            } else {
                redisLog(REDIS_NOTICE,
                    "Master replied to PING, replication can continue...");
            }
        }
    
        /* AUTH with the master if required. */
        /* 第三步:身份验证,需要从服务器设置masterauth选项 */
        if(server.masterauth) {
            /* sendSynchronousCommand就是对write等api的简单包装 */
            err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
            if (err[0] == '-') {
                redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
                sdsfree(err);
                goto error;
            }
            sdsfree(err);
        }
    
        /* Set the slave port, so that Master's INFO command can list the
         * slave listening port correctly. */
        /* 第四步:向主服务器发送端口信息 */
        {
            sds port = sdsfromlonglong(server.port);
            err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
                                             NULL);
            sdsfree(port);
            /* Ignore the error if any, not all the Redis versions support
             * REPLCONF listening-port. */
            if (err[0] == '-') {
                redisLog(REDIS_NOTICE,"(Non critical) Master does not understand REPLCONF listening-port: %s", err);
            }
            sdsfree(err);
        }
    
        /* Try a partial resynchonization. If we don't have a cached master
         * slaveTryPartialResynchronization() will at least try to use PSYNC
         * to start a full resynchronization so that we get the master run id
         * and the global offset, to try a partial resync at the next
         * reconnection attempt. */
        /* 第五步:同步 */
        /* 从服务器发送PSYNC命令,判断主服务器回复结果 */
        psync_result = slaveTryPartialResynchronization(fd);
    
        /* 如果是部分重同步,我们已经在上一行函数中监听了套接字可读事件,serverCron会自动回调命令执行过程 */
        if (psync_result == PSYNC_CONTINUE) {
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");
            return;
        }
    
        /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
         * and the server.repl_master_runid and repl_master_initial_offset are
         * already populated. */
        if (psync_result == PSYNC_NOT_SUPPORTED) {
            redisLog(REDIS_NOTICE,"Retrying with SYNC...");
            /* 主服务器可能不支持PSYNC,所以改用SYNC命令 */
            if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
                redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
                    strerror(errno));
                goto error;
            }
        }
    
        /* Prepare a suitable temp file for bulk transfer */
        /* 执行完整重同步,打开tmpfile,用来接收主服务器发来的rdb文件 */
        while(maxtries--) {
            snprintf(tmpfile,256,
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
            if (dfd != -1) break;
            sleep(1);
        }
        if (dfd == -1) {
            redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
            goto error;
        }
    
        /* Setup the non blocking download of the bulk file. */
        /* 通过监听读事件来非阻塞下载rdb文件 */
        if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                == AE_ERR)
        {
            redisLog(REDIS_WARNING,
                "Can't create readable event for SYNC: %s (fd=%d)",
                strerror(errno),fd);
            goto error;
        }
    
        /* 设置REDIS_REPL_TRANSFER,从服务器开始准备接收主服务器的.rdb文件 */
        server.repl_state = REDIS_REPL_TRANSFER;
        server.repl_transfer_size = -1;
        server.repl_transfer_read = 0;  
        server.repl_transfer_last_fsync_off = 0;  // 上次fsync到磁盘的偏移
        server.repl_transfer_fd = dfd;  // 打开的tmpfile的fd
        server.repl_transfer_lastio = server.unixtime;
        server.repl_transfer_tmpfile = zstrdup(tmpfile);
        return;
    
    error:
        close(fd);
        server.repl_transfer_s = -1;
        server.repl_state = REDIS_REPL_CONNECT;
        return;
    }
    
    • L6~L8:REDIS_NOTUSED:推测是防止编译时产生不必要的报警;
    • L18~L25:getsockopt:因为是非阻塞connect获得的socket fd,所以需要检查一下状态;
    • L39:注意LT模式下,可写事件用完即删,避免busyloop;
    • L120:第二、三节分析部分重同步 slaveTryPartialResynchronization
    • L157:第四节分析完整重同步,下载rdb文件,readSyncBulkPayload

    二、发送PSYNC命令,尝试进行部分重同步:

    部分重同步实现:

    • 复制偏移量:主、从服务器分别维护一个复制偏移量:

      struct redisServer {
      	long long master_repl_offset;   /* Global replication offset */  
      };
      
      tyepdef struct redisClient {
          long long reploff;      /* replication offset if this is our master */
      } redisClient;
      
      • L2:主服务器在server中维护;
      • L6:从服务器为主服务器在其对应的client中维护;
    • 复制积压缓冲区

    • 服务器运行ID(server.repl_master_runid):

    int slaveTryPartialResynchronization(int fd) {
        char *psync_runid;
        char psync_offset[32];
        sds reply;
    
        /* Initially set repl_master_initial_offset to -1 to mark the current
         * master run_id and offset as not valid. Later if we'll be able to do
         * a FULL resync using the PSYNC command we'll set the offset at the
         * right value, so that this information will be propagated to the
         * client structure representing the master into server.master. */
        server.repl_master_initial_offset = -1;
    
        if (server.cached_master) {
            /* 主服务器缓存存在,尝试部分重同步 */
            psync_runid = server.cached_master->replrunid;
            /* 主服务器维护的复制偏移量+1 */ 
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset);
        } else {
            /* PSYNC ? -1  将触发完整重同步 */
            redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_runid = "?";
            memcpy(psync_offset,"-1",3);
        }
    
        /* Issue the PSYNC command */
        /* 发送PSYNC,由于是同步发送,所以主服务器回复就存在reply中
            命令格式:PSYNC <runid> <offset>
         */
        reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL);
    
        /* 主服务器回复+FULLRESYNC,表示主服务器与从服务器将执行完整重同步操作 */
        if (!strncmp(reply,"+FULLRESYNC",11)) {
            char *runid = NULL, *offset = NULL;
    
            /* FULL RESYNC, parse the reply in order to extract the run id
             * and the replication offset. */
            runid = strchr(reply,' ');
            if (runid) {
                runid++;
                offset = strchr(runid,' ');
                if (offset) offset++;
            }
            if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {
                redisLog(REDIS_WARNING,
                    "Master replied with wrong +FULLRESYNC syntax.");
                /* This is an unexpected condition, actually the +FULLRESYNC
                 * reply means that the master supports PSYNC, but the reply
                 * format seems wrong. To stay safe we blank the master
                 * runid to make sure next PSYNCs will fail. */
                memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1);
            } else {
                /* 保存主服务器发来的runid */
                memcpy(server.repl_master_runid, runid, offset-runid-1);
                server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';
                server.repl_master_initial_offset = strtoll(offset,NULL,10);
                redisLog(REDIS_NOTICE,"Full resync from master: %s:%lld",
                    server.repl_master_runid,
                    server.repl_master_initial_offset);
            }
            /* We are going to full resync, discard the cached master structure. */
            /* 需要执行完整重同步,说明我们缓存的主服务器变了,将其删除 */
            replicationDiscardCachedMaster();
            sdsfree(reply);
            return PSYNC_FULLRESYNC;
        }
    
        /* 主服务器回复+CONTINUE,表示主服务器与从服务器将执行部分重同步操作 */
        if (!strncmp(reply,"+CONTINUE",9)) {
            /* Partial resync was accepted, set the replication state accordingly */
            redisLog(REDIS_NOTICE,
                "Successful partial resynchronization with master.");
            sdsfree(reply);
            /* 监听来自fd(套接字)上的可读事件,主服务器将写命令发过来;从服务器回调readQueryFromClient,进入命令执行过程 */
            replicationResurrectCachedMaster(fd);
            return PSYNC_CONTINUE;
        }
    
        /* If we reach this point we receied either an error since the master does
         * not understand PSYNC, or an unexpected reply from the master.
         * Return PSYNC_NOT_SUPPORTED to the caller in both cases. */
        /* 错误处理 */
        if (strncmp(reply,"-ERR",4)) {
            /* If it's not an error, log the unexpected event. */
            redisLog(REDIS_WARNING,
                "Unexpected reply to PSYNC from master: %s", reply);
        } else {
            redisLog(REDIS_NOTICE,
                "Master does not support PSYNC or is in "
                "error state (reply: %s)", reply);
        }
        sdsfree(reply);
        replicationDiscardCachedMaster();
        return PSYNC_NOT_SUPPORTED;
    }
    
    • L17:将缓存的主服务器的复制偏移量+1,作为PSYNC命令的offset参数发出;
    • L33:主服务器回复+FULLSYNC,完整重同步;
    • L69:主服务器回复+CONTINUE,部分重同步;
    • L75: 第三节将分析replicationResurrectCachedMaster

    三、部分重同步replicationResurrectCachedMaster:

    void replicationResurrectCachedMaster(int newfd) {
        /* 可以执行部分重同步,说明cached master有效,将它设为master */
        server.master = server.cached_master;
        server.cached_master = NULL;
        server.master->fd = newfd;
        server.master->flags &= ~(REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP);
        server.master->authenticated = 1;
        server.master->lastinteraction = server.unixtime;
        server.repl_state = REDIS_REPL_CONNECTED;
    
        /* Re-add to the list of clients. */
        listAddNodeTail(server.clients,server.master);
        /* 这里设置的回调函数是readQueryFromClient,说明master发来的数据,从服务器会当作client的命令来执行 */
        if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
                              readQueryFromClient, server.master)) {
            redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
            freeClientAsync(server.master); /* Close ASAP. */
        }
    
        /* We may also need to install the write handler as well if there is
         * pending data in the write buffers. */
        /* 如果client缓冲区中还有数据没发给主服务器,重新监听可写事件 */
        if (server.master->bufpos || listLength(server.master->reply)) {
            if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
                              sendReplyToClient, server.master)) {
                redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
                freeClientAsync(server.master); /* Close ASAP. */
            }
        }
    }
    
    • L14: 注意这里设置的回调函数是readQueryFromClient函数,说明是以执行命令的方式来完成部分重同步的;

      //  readQueryFromClient函数内部:
      	if (nread) {
              /* 更新sds的len和free属性 */
              sdsIncrLen(c->querybuf,nread);
              c->lastinteraction = server.unixtime;
              /* 如果客户端是master,即本端是slave;
               * 本端收到的命令请求是master发过来用于部分重同步的,
               * 所以需要更新reploff(replication offset) */
              if (c->flags & REDIS_MASTER) c->reploff += nread;
          } 
      
      • L9:从服务器每次收到主服务器传播来的n个字节,就将自己维护的复制偏移量+n;

    四、完整重同步:

    void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
        char buf[4096];
        ssize_t nread, readlen;
        off_t left;
        REDIS_NOTUSED(el);
        REDIS_NOTUSED(privdata);
        REDIS_NOTUSED(mask);
    
        /* If repl_transfer_size == -1 we still have to read the bulk length
         * from the master reply. */
        if (server.repl_transfer_size == -1) {
            if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
                redisLog(REDIS_WARNING,
                    "I/O error reading bulk count from MASTER: %s",
                    strerror(errno));
                goto error;
            }
    
            if (buf[0] == '-') {
                redisLog(REDIS_WARNING,
                    "MASTER aborted replication with an error: %s",
                    buf+1);
                goto error;
            } else if (buf[0] == '\0') {
                /* At this stage just a newline works as a PING in order to take
                 * the connection live. So we refresh our last interaction
                 * timestamp. */
                server.repl_transfer_lastio = server.unixtime;
                return;
            } else if (buf[0] != '$') {
                redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
                goto error;
            }
            server.repl_transfer_size = strtol(buf+1,NULL,10);
            redisLog(REDIS_NOTICE,
                "MASTER <-> SLAVE sync: receiving %lld bytes from master",
                (long long) server.repl_transfer_size);
            return;
        }
    
        /* Read bulk data */
        left = server.repl_transfer_size - server.repl_transfer_read;
        readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
        /* 读出nread个字节的rdb数据 */
        nread = read(fd,buf,readlen);
        if (nread <= 0) {
            redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
                (nread == -1) ? strerror(errno) : "connection lost");
            replicationAbortSyncTransfer();
            return;
        }
        server.repl_transfer_lastio = server.unixtime;
        if (write(server.repl_transfer_fd,buf,nread) != nread) {
            redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
            goto error;
        }
        server.repl_transfer_read += nread;
    
        /* Sync data on disk from time to time, otherwise at the end of the transfer
         * we may suffer a big delay as the memory buffers are copied into the
         * actual disk. */
        /* 使用Linux sync_file_range系统调用,避免最后一起fsync进磁盘,产生性能抖动 */
        if (server.repl_transfer_read >=
            server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
        {
            off_t sync_size = server.repl_transfer_read -
                              server.repl_transfer_last_fsync_off;
            rdb_fsync_range(server.repl_transfer_fd,
                server.repl_transfer_last_fsync_off, sync_size);
            server.repl_transfer_last_fsync_off += sync_size;
        }
    
        /* Check if the transfer is now complete */
        if (server.repl_transfer_read == server.repl_transfer_size) {
            /* rdb加载完,将tmpfile重命名为当前服务器的rdbfile */
            if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
                redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
                replicationAbortSyncTransfer();
                return;
            }
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
            signalFlushedDb(-1);
            emptyDb();
            /* Before loading the DB into memory we need to delete the readable
             * handler, otherwise it will get called recursively since
             * rdbLoad() will call the event loop to process events from time to
             * time for non blocking loading. */
            /* todo 结合rdbLoad看一下,暂时解释不清 */
            aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
            /* 载入rdb文件 */
            if (rdbLoad(server.rdb_filename) != REDIS_OK) {
                redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
                replicationAbortSyncTransfer();
                return;
            }
            /* Final setup of the connected slave <- master link */
            zfree(server.repl_transfer_tmpfile);
            close(server.repl_transfer_fd);
            /* 将新找到的主服务器作为client */
            server.master = createClient(server.repl_transfer_s);
            server.master->flags |= REDIS_MASTER;
            server.master->authenticated = 1;
            server.repl_state = REDIS_REPL_CONNECTED;
            server.master->reploff = server.repl_master_initial_offset;
            memcpy(server.master->replrunid, server.repl_master_runid,
                sizeof(server.repl_master_runid));
            redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
            /* Restart the AOF subsystem now that we finished the sync. This
             * will trigger an AOF rewrite, and when done will start appending
             * to the new file. */
            if (server.aof_state != REDIS_AOF_OFF) {
                int retry = 10;
    
                stopAppendOnly();
                while (retry-- && startAppendOnly() == REDIS_ERR) {
                    redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
                    sleep(1);
                }
                if (!retry) {
                    redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
                    exit(1);
                }
            }
        }
    
        return;
    
    error:
        replicationAbortSyncTransfer();
        return;
    }
    
    

    参考:

    ​ 本文基于redis 3.0

    1. Redis设计与实现 黄建宏
    2. Redis源码解析(13) 主从复制_李兆龙的博客
    3. 网络编程--非阻塞connect - macguz
    4. Linux下新系统调用sync_file_range提高数据sync的效率_xiaofei0859的专栏
  • 相关阅读:
    直接拿来用!最火的前端开源项目(一)
    前端开发框架三剑客
    javascript获取ckeditor编辑器的值(实现代码)
    FireFox不支持InnerText的解决方法
    makefile:4: *** missing separator. Stop.
    javascript跨域解决方案
    wap网站获取访问者手机号PHP类文件
    CentOS安装libpcap
    运用百度开放平台接口根据ip地址获取位置
    azure 云上MySQL最新版本 MySQL5.7.11 批量自动化一键式安装 (转)
  • 原文地址:https://www.cnblogs.com/macguz/p/15850238.html
Copyright © 2011-2022 走看看