在集群节点之间握手及分配好槽点之后,可以为主节点设置一个或多个从节点,以更好的应对故障,保证集群的稳定性。
本文主要介绍一下redis执行replicate命令的过程。
- 处理cluster replicate命令
同样,处理cluster命令的api为clusterCommand,其中处理replicate命令过程如下:
if (nodeIsMaster(myself) &&(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) { addReplyError(c,"To set a master the node must be empty and without assigned slots."); return; } /* Set the master. */
// 将节点 n 设为本节点的主节点 clusterSetMaster(n); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_C ONFIG); addReply(c,shared.ok);
- 将服务器设为指定地址的从服务器
void replicationSetMaster(char *ip, int port) { // 清除原有的主服务器地址(如果有的话) sdsfree(server.masterhost); server.masterhost = sdsnew(ip); server.masterport = port; // 如果之前有其他地址,那么释放它 if (server.master) freeClient(server.master); // 断开所有从服务器的连接,强制所有从服务器执行重同步 disconnectSlaves(); /* Force our slaves to resync with us as well. */ // 清空可能有的 master 缓存,因为已经不会执行 PSYNC 了 replicationDiscardCachedMaster(); /* Don't try a PSYNC. */ // 释放 backlog ,同理, PSYNC 目前已经不会执行了 freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ // 取消之前的复制进程(如果有的话) cancelReplicationHandshake(); // 进入连接状态(重点) server.repl_state = REDIS_REPL_CONNECT; server.master_repl_offset = 0; server.repl_down_since = 0; }
此时从节点(其实还不是)的repl_state = REDIS_REPL_CONNECT。在replicationCron中会对该状态向目标主节点建立一个连接。
if (server.repl_state == REDIS_REPL_CONNECT) { redisLog(REDIS_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); if (connectWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started"); } }
int connectWithMaster(void) { int fd; // 连接主服务器 fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport); if (fd == -1) { redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return REDIS_ERR; } // 监听主服务器 fd 的读和写事件,并绑定文件事件处理器 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); redisLog(REDIS_WARNING,"Can't create readable event for SYNC"); return REDIS_ERR; } }
此时使用的是ipfd,也就是说从节点是主节点的一个redisclient。而主节点创建一个redisclient包括了一下步骤:
acceptCommonHandler -> createClient -> aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) -> processInputBuffer -> processCommand
并注册了从服务器用于同步主服务器的回调函数syncWithMaster。在syncWithMaster中会处理一系列主从复制的操作,比如发送ping命令,身份验证,发送从节点端口信息。
之后进行同步请求
// 根据返回的结果决定是执行部分 resync ,还是 full-resync psync_result = slaveTryPartialResynchronization(fd);
发送的是“psync”命令。主节点接受之后会将该redisclient加入到slaves
int masterTryPartialResynchronization(redisClient *c) { ... listAddNodeTail(server.slaves,c); ... }
之后主节点在replicationCron中会主动向从节点同步数据(包括全量复制和部分复制)。值得注意的是,在同步过程中,主节点会开辟一个命令缓冲区,缓存同步时的写命令。在同步完成之后还会进行命令同步。