zoukankan      html  css  js  c++  java
  • 主从复制源代码分析

    • Slave端结构定义

      在了解replicantion核心之前,先了解replication在redis.conf的配置选项。

      #slaveof [masterip] [masterport]  设置master的ip和port
      #masterauth [master-password]     如果master需要auth,在此设置password
      #slave-serve-stale-data yes       如果slave与master的连接断开,该选项决定slave是否继续提供服务
      #slave-read-only yes              slave是否是只读的
      #repl-ping-slave-period 10        master端ping slave端的时间间隔,时刻检测slave连接的有效
      #repl-timeout 60                  replication连接的超时时间
      #slave-priority 100               slave的权重,用于redis sentinel模式中,如果master down,权重大的slave接替master


      server结构的关于slave端的成员变量

      /* Slave specific fields */
          char *masterauth;               /* AUTH with this password with master */
          char *masterhost;               /* Hostname of master */
          int masterport;                 /* Port of master */
          int repl_ping_slave_period;     /* Master pings the slave every N seconds */
          int repl_timeout;               /* Timeout after N seconds of master idle */
          redisClient *master;     /* Client that is master for this slave */
          int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
          int repl_state;          /* 值为上述的replication的状态宏 */
          off_t repl_transfer_size; /* master发送给slave的rdb文件大小 */
          off_t repl_transfer_read; /* 已经从master读取的rdb文件大小 */
          off_t repl_transfer_last_fsync_off; /* slave端收到rdb文件后同步到磁盘的文件大小偏移 */
          int repl_transfer_s;     /* slave端获取rdb文件的socket */
          int repl_transfer_fd;    /* slave收到rdb文件后存放到磁盘的文件fd */
          char *repl_transfer_tmpfile; /* slave存放rdb文件的文件名 */
          time_t repl_transfer_lastio; /* slave端上一次收到master端传送的ddb文件的unix time */
          int repl_serve_stale_data; /* 跟master断开后,是否继续服务? */
          int repl_slave_ro;          /* Slave is read only? */
          time_t repl_down_since; /* Unix time at which link with master went down */
          int slave_priority;             /* Reported in INFO and used by Sentinel. */


      replication的几个状态宏 – slave端复制状态

      #define REDIS_REPL_NONE 0 /* 未复制的状态 */
      #define REDIS_REPL_CONNECT 1 /* 已经接收到slaveof命令,但未发出sync命令给master */
      #define REDIS_REPL_CONNECTING 2 /* 正在发送ping给master */
      #define REDIS_REPL_RECEIVE_PONG 3 /* 发送ping完毕,等待PING回复 */
      #define REDIS_REPL_TRANSFER 4 /* 已经发出sync,但还没接收完rdb文件 */
      #define REDIS_REPL_CONNECTED 5 /* 连接master成功 */
      从状态宏也可以看出slave连接master经过几个过程: 1. 收到replication的指示 2. 建立socket连接到master,准备发送ping命令个master 3. 发送ping给master后,等待master的回复 4. 等待master传送rdb文件->收到rdb文件后,完成replication建立。额外的ping命令是redis应用层校验连接成功的额外过程。 redis通过replicantion状态的标示来异步进行replicantion的各阶段。

      Slave端发起同步请求

      首先,即将成为slave的redis instance收到slaveof命令或者启动时配置了slaveof选项,则执行slaveofCommand函数(replicantion.c)。如果命令是slaveof no one,那么取消replication。

      void slaveofCommand(redisClient *c) {
          if (!strcasecmp(c->argv[1]->ptr,"no") &&
              !strcasecmp(c->argv[2]->ptr,"one")) {
              if (server.masterhost) {
                  sdsfree(server.masterhost);
                  server.masterhost = NULL;
                  if (server.master) freeClient(server.master);
                  if (server.repl_state == REDIS_REPL_TRANSFER)
                      replicationAbortSyncTransfer();
                  else if (server.repl_state == REDIS_REPL_CONNECTING ||
                           server.repl_state == REDIS_REPL_RECEIVE_PONG)
                      undoConnectWithMaster();
                  server.repl_state = REDIS_REPL_NONE;
                  redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
              }
      否则得到master ip和port后,判断是否已经与该master建立连接,若不是,则放弃若已有的replication连接并初始化server的几个replication成员变量。

      long port;
       
      if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
          return;
       
      /* Check if we are already attached to the specified slave */
      if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
          && server.masterport == port) {
          redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
          addReplySds(c,sdsnew("+OK Already connected to specified master "));
          return;
      }
      sdsfree(server.masterhost);
      server.masterhost = sdsdup(c->argv[1]->ptr);
      server.masterport = port;
      if (server.master) freeClient(server.master);
      disconnectSlaves(); /* Force our slaves to resync with us as well. */
      if (server.repl_state == REDIS_REPL_TRANSFER)
          replicationAbortSyncTransfer();
      server.repl_state = REDIS_REPL_CONNECT;
      到此,slaveof的初始化结束,通过server.repl_state来标示replicantion的进展。

      在serverCron这个redis核心回调中,调用replicationCron()(replication.c)

      /* Replication cron function -- used to reconnect to master and
       * to detect transfer failures. */
      run_with_period(1000) replicationCron();
      在replicationCron()中,通过server.repl_state做检测,检测是否连接master超时,传输rdb文件是否超时,连接master成功后是否空闲超时,如果server.repl_state为REDIS_REPL_CONNECT,也就是在slaveofCommand设置的状态,那么启动连接master,调用connectWithMaster()

      /* Check if we should connect to a MASTER */
      if (server.repl_state == REDIS_REPL_CONNECT) {
          redisLog(REDIS_NOTICE,"Connecting to MASTER...");
          if (connectWithMaster() == REDIS_OK) {
              redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
          }
      在connectWithMaster()中,尝试连接到server后,创建文件事件,当可读或者可写时,调用syncWithMaster()(replication.c),设置server.repl_state为REDIS_REPL_CONNECTING,表示建立socket连接后,即将发送ping命令给master端。

      int connectWithMaster(void) {
          int fd;
       
          fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
          if (fd == -1) {
              redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
                  strerror(errno));
              return REDIS_ERR;
          }
       
          if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
                  AE_ERR)
          {
              close(fd);
              redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
              return REDIS_ERR;
          }
       
          server.repl_transfer_lastio = server.unixtime;
          server.repl_transfer_s = fd;
          server.repl_state = REDIS_REPL_CONNECTING;
          return REDIS_OK;
      }
      syncWithMaster()是slave端在replication启动后与master端建立slave-master关系的核心函数。在之前的connectWithMaster()后,此时slave端需要发送PING命令给master,检测master是否能回复,以此判断连接成功的socket是否为redis instance,在这种,redis一改之前全部的异步读写,使用了syncWrite,该函数向fd写入内容,如果阻塞,那么redis也会阻塞直到全部内容写入发送缓冲区。

      if (server.repl_state == REDIS_REPL_CONNECTING) {
              redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
              /* Delete the writable event so that the readable event remains
               * registered and we can wait for the PONG reply. */
              aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
              server.repl_state = REDIS_REPL_RECEIVE_PONG;
              /* Send the PING, don't check for errors at all, we have the timeout
               * that will take care about this. */
              syncWrite(fd,"PING ",6,100);
              return;
          }
      写入成功后,退出该函数,直到master端回复PING命令后,slave端再次调用syncWithMaster(),并且进入以下过程。由于master已经回复PING命令,该fd不再需要可读文件事件回调,同步读取master的回复内容,并且判断是否正常。

      if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
          char buf[1024];
       
          aeDeleteFileEvent(server.el,fd,AE_READABLE);
       
          /* Read the reply with explicit timeout. */
          buf[0] = '';
          if (syncReadLine(fd,buf,sizeof(buf),
              server.repl_syncio_timeout*1000) == -1)
          {
              redisLog(REDIS_WARNING,
                  "I/O error reading PING reply from master: %s",
                  strerror(errno));
              goto error;
          }
          //此时有可能slave回复需要auth
          if (buf[0] != '-' && buf[0] != '+') {
              redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
              goto error;
          } else {
              redisLog(REDIS_NOTICE,
                  "Master replied to PING, replication can continue...");
          }
      }
      如果master端需要auth,那么此时再发送AUTH命令,验证slave端。这里调用的sendSynchronousCommand()(replicantion.c)是特殊的同步发送命令,该发送会确保write到fd的内容全部输出,并且阻塞等待master端回复命令。因此,当返回时,master已经回复通过auth验证或者验证失败等。

      /* AUTH with the master if required. */
          if(server.masterauth) {
              err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
              if (err) {
                  redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
                  sdsfree(err);
                  goto error;
              }
          }
      slave端在与master完成应用层握手后,阻塞发送sync命令,请求同步。

      if (syncWrite(fd,"SYNC ",6,server.repl_syncio_timeout*1000) == -1) {
          redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
              strerror(errno));
          goto error;
      }
      同时,slave会建立临时文件,用于存放获得的rdb文件。maxtries用于不断尝试建立临时文件的次数。

      /* Prepare a suitable temp file for bulk transfer */
          while(maxtries--) {
              snprintf(tmpfile,256,
                  "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
              dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
              if (dfd != -1) break;
              sleep(1);
          }
          if (dfd == -1) {
              redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
              goto error;
          }


      Slave端接收数据

      到了这里,slave端准备开始接收master端发送的rdb文件,创建文件事件,回调readSyncBulkPayload()(replication.c)。设置server.repl_state为REDIS_REPL_TRANSFER并且初始化传输rdb文件的相关变量。syncWithMaster的使命也完成了。

          if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
                  == AE_ERR)
          {
              redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
              goto error;
          }
          server.repl_state = REDIS_REPL_TRANSFER;
          server.repl_transfer_size = -1;
          server.repl_transfer_read = 0;
          server.repl_transfer_last_fsync_off = 0;
          server.repl_transfer_fd = dfd;
          server.repl_transfer_lastio = server.unixtime;
          server.repl_transfer_tmpfile = zstrdup(tmpfile);
          return;
       
      error:
          close(fd);
          server.repl_transfer_s = -1;
          server.repl_state = REDIS_REPL_CONNECT;
          return;
      }
      在redis的ae事件库检测到master端fd可读时,表示master已经建立完成rdb文件,开始发送给slave。事件库回调readSyncBulkPayload(),开始异步接收rdb文件。第一次接收时,需要先得到整个rdb文件的大小。syncReadLine()提取接收到的内容,master发送的rdb文件会以”$[rdb size] ”开始,因此readline后,正确接收的buf应该只会从缓冲区获得该头部。

      if (server.repl_transfer_size == -1) {
              if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
                  redisLog(REDIS_WARNING,
                      "I/O error reading bulk count from MASTER: %s",
                      strerror(errno));
                  goto error;
              }
       
              if (buf[0] == '-') {
                  redisLog(REDIS_WARNING,
                      "MASTER aborted replication with an error: %s",
                      buf+1);
                  goto error;
              } else if (buf[0] == '') {
                  /* At this stage just a newline works as a PING in order to take
                   * the connection live. So we refresh our last interaction
                   * timestamp. */
                  server.repl_transfer_lastio = server.unixtime;
                  return;
              } else if (buf[0] != '$') {
                  redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
                  goto error;
              }
              server.repl_transfer_size = strtol(buf+1,NULL,10);
              redisLog(REDIS_NOTICE,
                  "MASTER <-> SLAVE sync: receiving %ld bytes from master",
                  server.repl_transfer_size);
              return;
          }
      在得知rdb文件大小后,该fd再次可读时,开始接收rdb内容。

      left = server.repl_transfer_size - server.repl_transfer_read;
      readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
      nread = read(fd,buf,readlen);
      if (nread <= 0) {
          redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
              (nread == -1) ? strerror(errno) : "connection lost");
          replicationAbortSyncTransfer();
          return;
      }
      server.repl_transfer_lastio = server.unixtime;
      if (write(server.repl_transfer_fd,buf,nread) != nread) {
          redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
          goto error;
      }
      server.repl_transfer_read += nread;
       
      /* 我们需要经常讲内容写到磁盘上,避免最后时刻才写造成一定的延误 */
      if (server.repl_transfer_read >=
          server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
      {
          off_t sync_size = server.repl_transfer_read -
                            server.repl_transfer_last_fsync_off;
          rdb_fsync_range(server.repl_transfer_fd,
              server.repl_transfer_last_fsync_off, sync_size);
          server.repl_transfer_last_fsync_off += sync_size;
      }
      重复以上过程,直到rdb文件完整接收后,重命名该临时文件,然后调用emptyDb()清空数据库。删除master端可读的文件事件。从rdb文件中读取完成建立数据库。

      if (server.repl_transfer_read == server.repl_transfer_size) {
              if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
                  redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
                  replicationAbortSyncTransfer();
                  return;
              }
              redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
              emptyDb();
              aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
              if (rdbLoad(server.rdb_filename) != REDIS_OK) {
                  redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
                  replicationAbortSyncTransfer();
                  return;
              }
      最后完成server的相关成员变量的初始化,并且重新启动aof。

          /* Final setup of the connected slave <- master link */
          zfree(server.repl_transfer_tmpfile);
          close(server.repl_transfer_fd);
          server.master = createClient(server.repl_transfer_s);
          server.master->flags |= REDIS_MASTER;
          server.master->authenticated = 1;
          server.repl_state = REDIS_REPL_CONNECTED;
          redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
           
          if (server.aof_state != REDIS_AOF_OFF) {
              int retry = 10;
       
              stopAppendOnly();
              while (retry-- && startAppendOnly() == REDIS_ERR) {
                  redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchrnization! Trying it again in one second.");
                  sleep(1);
              }
              if (!retry) {
                  redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
                  exit(1);
              }
          }
      }
      到这里,slave端的replication部分已经结束。让我们再看master端进行的repliation操作。

      Master端接受同步请求

      redisClient的几个关于replication的成员变量

      int replstate;          /* 复制状态 */
      int repldbfd;           /* 发送给该slave的rdb fd */
      long repldboff;         /* 发送给该slave的rdb 偏移 */
      off_t repldbsize;       /* 发送给该slave的ddb文件大小 */
      int slave_listening_port; /* As configured with: SLAVECONF listening-port */
      master端的slave的replication状态,指redisClient.replstate

      #define REDIS_REPL_WAIT_BGSAVE_START 3 /* 等待master启动bgsave */
      #define REDIS_REPL_WAIT_BGSAVE_END 4 /* 等待master bgsave完成,启动rdb的传输 */
      #define REDIS_REPL_SEND_BULK 5 /* master正在发送rdb文件 */
      #define REDIS_REPL_ONLINE 6 /* rdb发送结束,持续接收更新 */
      通过该状态,我们可能得出,master端完成对slave的replication有以下过程:接收到slave发送的sync命令,启动bgsave后台建立rdb->bgsave完成,开始对该slave进行传送rdb->rdb传送完成,开始持续对slave的更新

      master端在slave端完成PING命令应用层校验后,发送sync命令开始replication准备。接收到SYNC命令后调用syncCommand()(replicantion.c)。

      首先,redis会检查是否有slave正在进行bgsave,如果有,则可以把该slave的bgsave建立的rdb文件同时发送给当前正在处理的slave,同时把该slave的等待发送的缓冲区复制当前处理的slave(缓冲区内为建立rdb文件后到发送给slave的时间内,redis进行的更行操作的命令)。 如果没有,那么调用rdbSaveBackground()(rdb.c)准备bgsave。 此时,replstate的状态都将设置为REDIS_REPL_WAIT_BGSAVE_END。

      if (server.rdb_child_pid != -1) {
          redisClient *slave;
          listNode *ln;
          listIter li;
       
          listRewind(server.slaves,&li);
          while((ln = listNext(&li))) {
              slave = ln->value;
              if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
          }
          if (ln) {
              /* Perfect, the server is already registering differences for
               * another slave. Set the right state, and copy the buffer. */
              copyClientOutputBuffer(c,slave);
              c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
              redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
          } else {
              /* No way, we need to wait for the next BGSAVE in order to
               * register differences */
              c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
              redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
          }
      } else {
          /* Ok we don't have a BGSAVE in progress, let's start one */
          redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
          if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
              redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
              addReplyError(c,"Unable to perform background save");
              return;
          }
          c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
      }
      server.dirty保存的是没有写到磁盘上的更新操作的个数,当启动bgsave后,所有更新操作都会被写到磁盘,server.dirty也需要更新。fork后子进程调用rdbSave(),父进程保存子进程状态,返回。

      int rdbSaveBackground(char *filename) {
          pid_t childpid;
          long long start;
       
          if (server.rdb_child_pid != -1) return REDIS_ERR;
       
          server.dirty_before_bgsave = server.dirty;
       
          start = ustime();
          if ((childpid = fork()) == 0) {
              int retval;
       
              /* Child */
              if (server.ipfd > 0) close(server.ipfd);
              if (server.sofd > 0) close(server.sofd);
              retval = rdbSave(filename);
              exitFromChild((retval == REDIS_OK) ? 0 : 1);
          } else {
              /* Parent */
              server.stat_fork_time = ustime()-start;
              if (childpid == -1) {
                  redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                      strerror(errno));
                  return REDIS_ERR;
              }
              redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
              server.rdb_save_time_start = time(NULL);
              server.rdb_child_pid = childpid;
              updateDictResizePolicy();
              return REDIS_OK;
          }
          return REDIS_OK; /* unreached */
      }
      回到syncCommand(),redis完成对该slave的注册。

          c->repldbfd = -1;
          c->flags |= REDIS_SLAVE;
          c->slaveseldb = 0;
          listAddNodeTail(server.slaves,c);
          return;
      }
      同样,在serverCron中,当检测到有bgsave子进程时,会检测bgsave是否结束。如果结束,则检测结束的状态和结束的原因。然后调用backgroundSaveDoneHandler()(rdb.c)

      if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
          int statloc;
          pid_t pid;
       
          if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
              int exitcode = WEXITSTATUS(statloc);
              int bysignal = 0;
               
              if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
       
              if (pid == server.rdb_child_pid) {
                  backgroundSaveDoneHandler(exitcode,bysignal);
              } else {
                  backgroundRewriteDoneHandler(exitcode,bysignal);
              }
              updateDictResizePolicy();
          }

      确保bgsave进程是正常结束,不是被信号打断,更新server.dirty。调用updateSlavesWaitingBgsave()(replication.c)

      void backgroundSaveDoneHandler(int exitcode, int bysignal) {
          if (!bysignal && exitcode == 0) {
              redisLog(REDIS_NOTICE,
                  "Background saving terminated with success");
              server.dirty = server.dirty - server.dirty_before_bgsave;
              server.lastsave = time(NULL);
              server.lastbgsave_status = REDIS_OK;
          } else if (!bysignal && exitcode != 0) {
              redisLog(REDIS_WARNING, "Background saving error");
              server.lastbgsave_status = REDIS_ERR;
          } else {
              redisLog(REDIS_WARNING,
                  "Background saving terminated by signal %d", bysignal);
              rdbRemoveTempFile(server.rdb_child_pid);
              server.lastbgsave_status = REDIS_ERR;
          }
          server.rdb_child_pid = -1;
          server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
          server.rdb_save_time_start = -1;
          /* Possibly there are slaves waiting for a BGSAVE in order to be served
           * (the first stage of SYNC is a bulk transfer of dump.rdb) */
          updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
      }
      检查每个slave的replication状态,如果是REDIS_REPL_WAIT_BGSAVE_END,则建立可写文件事件,回调sendBulkToSlave(),发送rdb文件给slave。如果是REDIS_REPL_WAIT_BGSAVE_START,则说明又有新加入的slave需要建立rdb文件,那么再次调用rdbSaveBackground()进行bgsave。

      void updateSlavesWaitingBgsave(int bgsaveerr) {
          listNode *ln;
          int startbgsave = 0;
          listIter li;
       
          listRewind(server.slaves,&li);
          while((ln = listNext(&li))) {
              redisClient *slave = ln->value;
       
              if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
                  startbgsave = 1;
                  slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
              } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
                  struct redis_stat buf;
       
                  if (bgsaveerr != REDIS_OK) {
                      freeClient(slave);
                      redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
                      continue;
                  }
                  if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
                      redis_fstat(slave->repldbfd,&buf) == -1) {
                      freeClient(slave);
                      redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
                      continue;
                  }
                  slave->repldboff = 0;
                  slave->repldbsize = buf.st_size;
                  slave->replstate = REDIS_REPL_SEND_BULK;
                  aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
                  if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                      freeClient(slave);
                      continue;
                  }
              }
          }
      当有slave的fd可写时,事件库回调sendBulkToSlave(),当第一次发送rdb文件时,slave->repldboff==0,需要先发送rdb文件的大小。

      void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
          redisClient *slave = privdata;
          REDIS_NOTUSED(el);
          REDIS_NOTUSED(mask);
          char buf[REDIS_IOBUF_LEN];
          ssize_t nwritten, buflen;
       
          if (slave->repldboff == 0) {
              sds bulkcount;
       
              bulkcount = sdscatprintf(sdsempty(),"$%lld ",(unsigned long long)
                  slave->repldbsize);
              if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
              {
                  sdsfree(bulkcount);
                  freeClient(slave);
                  return;
              }
              sdsfree(bulkcount);
          }
      然后发送rdb文件内容,由于整个文件分次发送,需要多次回调sendBulkToSlave()发送,每次使用lseek定位到上次发送位置,发送后续内容。

      lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
      buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
      if (buflen <= 0) {
          redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
              (buflen == 0) ? "premature EOF" : strerror(errno));
          freeClient(slave);
          return;
      }
      if ((nwritten = write(fd,buf,buflen)) == -1) {
          redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
              strerror(errno));
          freeClient(slave);
          return;
      }
      slave->repldboff += nwritten;
      重复发送直到完成后,完成了对slave的replication建立,slave->replstate = REDIS_REPL_ONLINE。并且建立文件可写事件,回调sendReplyToClient()(networking.c),该回调旨在当master收到写命令后,需要更新slave的数据,回调sendReplyToClient()发送更新命令。

      if (slave->repldboff == slave->repldbsize) {
          close(slave->repldbfd);
          slave->repldbfd = -1;
          aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
          slave->replstate = REDIS_REPL_ONLINE;
          if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
              sendReplyToClient, slave) == AE_ERR) {
              freeClient(slave);
              return;
          }
          redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
      }
      此时,master端也完成了整个replication过程,在bgsave建立到传送给slave的时间内master发送的更新操作会写到发送给slave的发送缓冲区,传播更新。 在master接受到命令执行后,当出现写操作时,在call()(redis.c)会调用propagate()传播该命令。propagate()调用replicationFeedSlaves()

          if (flags & REDIS_CALL_PROPAGATE) {
              int flags = REDIS_PROPAGATE_NONE;
       
              if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
                  flags |= REDIS_PROPAGATE_REPL;
              if (dirty)
                  flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
              if (flags != REDIS_PROPAGATE_NONE)
                  propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
          }
       
      void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                     int flags)
      {
          if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
              feedAppendOnlyFile(cmd,dbid,argv,argc);
          if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
              replicationFeedSlaves(server.slaves,dbid,argv,argc);
      }
      所有不处于REDIS_REPL_WAIT_BGSAVE_START的slaves的发送缓冲都会写入命令。slave->slaveseldb指的是目前slave选择的数据库,如果与该命令写入的数据库不一致,master还需要发送select命令。

      void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
          listNode *ln;
          listIter li;
          int j;
       
          listRewind(slaves,&li);
          while((ln = listNext(&li))) {
              redisClient *slave = ln->value;
       
              /* Don't feed slaves that are still waiting for BGSAVE to start */
              if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
       
              if (slave->slaveseldb != dictid) {
                  robj *selectcmd;
       
                  if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
                      selectcmd = shared.select[dictid];
                      incrRefCount(selectcmd);
                  } else {
                      selectcmd = createObject(REDIS_STRING,
                          sdscatprintf(sdsempty(),"select %d ",dictid));
                  }
                  addReply(slave,selectcmd);
                  decrRefCount(selectcmd);
                  slave->slaveseldb = dictid;
              }
              addReplyMultiBulkLen(slave,argc);
              for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
          }
      }
      master端在replicationCron和propagate过程中会遍历slaves,执行操作,因此每个master保持的slaves不宜过多,建议较多slave时采用链式slave->slave->master。

      在replication中可以看到,redis的异步性得到很大体现,通过状态标示来解决异步时回调函数的杂乱问题。

  • 相关阅读:
    [转]WIBKIT技术资料
    WebKit学习要点
    提高IOS开发效率的常用网站、开源类库及工具
    【浏览器那些基础】Android平台有那些CPU类型
    深刻的理解
    Spring Boot 最流行的 16 条实践解读,值得收藏!
    MyBatis动态SQL(认真看看, 以后写SQL就爽多了)
    为什么很多 SpringBoot 开发者放弃了 Tomcat,选择了 Undertow?
    朋友,别告诉我你懂分布式事务!
    分布式锁用 Redis 还是 Zookeeper?
  • 原文地址:https://www.cnblogs.com/lsx1993/p/4633039.html
Copyright © 2011-2022 走看看