Redis源码阅读(二)高可用设计-复制
复制的概念:Redis的复制简单理解就是一个Redis服务器从另一台Redis服务器复制所有的Redis数据库数据,能保持两台Redis服务器的数据库数据一致。
使用场景:复制机制很实用,在客户端并发访问量很大,单台Redis扛不住的情况下,可以部署多台Redis复制相同的数据,共同对外提供服务,提高Redis并发访问处理能力。当然这种通过复制方式部署多台Redis以提高并发处理能力的方式只适用于客户端大部分访问为读数据请求的场景。此外,Redis从2.8版本以后支持的Sentinel高可用机制(热备)也需要依赖复制功能来同步几台机器的Redis数据。
个人在阅读源码时比较关心的几个复制问题如下:
- Redis复制时,机器之间传递的数据结构是怎样的?
- 为保证各节点的数据一致,应该要经常进行数据同步,Redis数据量很大时,每次同步都全量肯定不现实,那Redis中的增量同步是如何实现的?
- 如果从节点断线后重连到主节点,是否会触发全量同步,在网络状况不好的情况下可能会出现频繁的重连,如果每次重连再进行全量同步会加重网络负担,Redis是否在这种情况下做了增量同步?
接下来会介绍Redis复制的主要流程,并针对这几个问题来看Redis源码的实现。
1. 初次复制的流程
- 客户端发slaveof命令给从服务器,给从服务器指定需要复制的主服务器ip和端口
- 从服务器收到slaveof命令后,创建到主服务器的连接并发送PING来确认连接有效
- 从服务器接收到主服务器返回PONG,确认可以和主服务器通信后,向主服务器发送”PSYNC ? -1”命令,申请执行初次的全量同步
- 主服务器收到PSYNC,开始在内部执行BGSAVE命令,将数据库写到RDB文件中;注意,在执行BGSAVE期间,主服务器还可接收客户端的写命令,这些命令会向缓冲区中写一份,确保在执行BGSAVE期间发生的写命令也可以同步给从服务器
- 主服务器执行BGSAVE后,发送RDB文件给从服务器,从服务器导入RDB文件
- 发送RDB完成后,主服务器将缓冲区中的写命令也发送给从服务器,从服务器执行这些写命令
备注:这里介绍的是主要流程,忽略了身份认证的步骤
复制时机器间传递的数据结构是RDB文件;RDB是Redis持久化的一种方式;RDB文件中记录的是Redis数据库中所有的key-value对;这种持久方式在任意时刻开启都能保证持久化的数据是Redis中完整数据,与此相对的AOF持久化方式则是将所有写命令计入到文件中,这种文件保留的数据只是从开启AOF的那一刻开始,开启之前的数据是无法保存的,所以复制机制没有使用AOF文件。
源码中,服务器处理slaveof命令的处理器是slaveofCommand()函数
void slaveofCommand(redisClient *c) { //想让一个节点A通过slaveof成为另一个节点B的slave,则必须是B在单机模式下 // ....... /* Check if we are already attached to the specified slave */ // 检查输入的 host 和 port 是否服务器目前的主服务器 // 如果是的话,向客户端返回 +OK ,不做其他动作 //说明之前已经slaveof ip port过,这次又执行该命令,说明之前已经连接建立成功过了 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr) && server.masterport == port) { redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed."); addReplySds(c,sdsnew("+OK Already connected to specified master "));//向从服务器的redis-cli客户端发送该命令 return; } //第一次执行slaveof ip port,新的ip或者port /* There was no previous master or the user specified a different one, * we can continue. */ // 没有前任主服务器,或者客户端指定了新的主服务器 // 开始执行复制操作 replicationSetMaster(c->argv[1]->ptr, port); redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)", server.masterhost, server.masterport); } addReply(c,shared.ok); }
在该函数中通过replicationSetMaster()函数设置要连接的主服务器的IP和端口,并将服务器连接状态server.repl_state置为REDIS_REPL_CONNECT。从而会触发connectWithMaster()函数将连接状态置为REDIS_REPL_CONNECTING,继而触发slaveTryPartialResynchronization()函数来发送PSYNC命令。connectWithMaster函数创建连接套接字专门用于主备数据同步
// 以非阻塞方式连接主服务器 int connectWithMaster(void) { int fd; // 连接主服务器 fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } // 监听主服务器 fd 的读和写事件,并绑定文件事件处理器 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } // 初始化统计变量 server.repl_transfer_lastio = server.unixtime; server.repl_transfer_s = fd; // 将状态改为已连接 server.repl_state = REDIS_REPL_CONNECTING; return REDIS_OK; }
看下slaveTryPartialResynchronization()的具体实现实现
int slaveTryPartialResynchronization(int fd) { char *psync_runid; char psync_offset[32]; sds reply; server.repl_master_initial_offset = -1; //例如从服务器之前和主服务器连接上,并同步了数据,中途端了,则连接断了后会在replicationCacheMaster把server.cached_master = server.master; //表示之前有连接到过服务器 if (server.cached_master) { // 缓存存在,尝试部分重同步 // 命令为 "PSYNC <master_run_id> <repl_offset>" psync_runid = server.cached_master->replrunid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_runid, psync_offset); } else { // 缓存不存在 // 发送 "PSYNC ? -1" ,要求完整重同步 redisLog(REDIS_NOTICE,"Partial resynchronization not possible (no cached master)"); psync_runid = "?"; memcpy(psync_offset,"-1",3); // } /* Issue the PSYNC command */ // 向主服务器发送 PSYNC 命令 reply = sendSynchronousCommand(fd,"PSYNC",psync_runid,psync_offset,NULL); // 接收到 FULLRESYNC ,进行 full-resync if (!strncmp(reply,"+FULLRESYNC",11)) { char *runid = NULL, *offset = NULL; /* FULL RESYNC, parse the reply in order to extract the run id * and the replication offset. */ // 分析并记录主服务器的 run id runid = strchr(reply,' '); if (runid) { runid++; offset = strchr(runid,' '); if (offset) offset++; } // 检查 run id 的合法性 if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) { redisLog(REDIS_WARNING, "Master replied with wrong +FULLRESYNC syntax."); // 主服务器支持 PSYNC ,但是却发来了异常的 run id // 只好将 run id 设为 0 ,让下次 PSYNC 时失败 memset(server.repl_master_runid,0,REDIS_RUN_ID_SIZE+1); } else { // 保存 run id memcpy(server.repl_master_runid, runid, offset-runid-1); server.repl_master_runid[REDIS_RUN_ID_SIZE] = '