zoukankan      html  css  js  c++  java
  • Redis5设计与源码分析 (第22章 哨兵和集群)

    哨兵是Redis的高可用方案,可以在Redis Master发生故障时自动选择一个Redis Slave切换为Master,继续对外提供服务。集群提供数据自动分片到不同节点的功能,并且当部分节点失效后仍然可以使用。

    22.1 哨兵

    哨兵通过与Master和Slave的通信,能够清楚每个Redis服务的健康状态。

    当Master发生故障时,哨兵能够知晓,然后通过对Slave健康状态、优先级、同步数据状态等的综合判断,选取其中一个Slave切换为Master,并且修改其他Slave指向新的Master地址。

    为什么实际中至少会部署3个以上哨兵并且数量最好是奇数呢?

    那假如部署2个哨兵呢?当Redis的Master发生故障时,选leader,假设2个哨兵各自投自己一票,根本选举不出leader。所以哨兵个数最好是奇数。

    图22-1 哨兵部署方案

    我们思考如下问题。

    1)切换完成之后,客户端和其他哨兵如何知道现在提供服务的Redis Master是哪一个呢?

    2)假设执行切换的哨兵发生了故障,切换操作是否会由其他哨兵继续完成呢?

    3)当故障Master恢复之后,会继续作为Master提供服务还是会作为一个Slave提供服务?

    22.1.1 哨兵简介

    典型的哨兵配置文件

    //监控一个名称为mymaster的Redis Master服务,地址和端口号为127.0.0.1:6379,quorum为2

    sentinel monitor mymaster 127.0.0.1 6379 2

    //如果哨兵60s内未收到mymaster的有效ping回复,则认为mymaster处于down的状态

    sentinel down-after-milliseconds mymaster 60000

    sentinel failover-timeout mymaster 180000 //执行切换的超时时间为180s

    //切换完成后同时向新的Redis Master发起同步数据请求的Redis Slave个数为1,即切换完成后依次让每个Slave去同步数据,前一个Slave同步完成后下一个Slave才发起同步数据的请求

    sentinel parallel-syncs mymaster 1

    //监控一个名称为resque的Redis Master服务,地址和端口号为127.0.0.1:6380,quorum为4

    sentinel monitor resque 192.168.1.3 6380 4

    sentinel down-after-milliseconds resque 10000

    sentinel failover-timeout resque 180000

    sentinel parallel-syncs resque 5

     

    quorum在哨兵中有两层含义。

    第一层含义为:如果某个哨兵认为其监听的Master处于下线的状态,这个状态在Redis中标记为S_DOWN,即主观下线。假设quorum配置为2,则当有两个哨兵同时认为一个Master处于下线的状态时,会标记该Master为O_DOWN,即客观下线只有一个Master处于客观下线状态时才会开始执行切换(认为客观下线的哨兵num)

    第二层含义为:假设有5个哨兵,quorum配置为4。首先,判断客观下线需要4个哨兵才能认定。其次,当开始执行切换时,会从5个哨兵中选择一个leader执行该次选举,此时一个哨兵也必须得到4票才能被选举为leader,而不是3票(即哨兵的大多数)。 (选举leader的哨兵num)

     

    配置文件中首先配置了需要监控的Redis Master服务器,然后设置了一些服务相关的参数,并没有Redis Slave和其他哨兵的配置。而通过图22-1,我们看到每个哨兵都必须与所有监控的Redis Master下的Slave服务器以及其他监控该Master的哨兵建立连接。显然,哨兵只通过配置文件是不能知道这些信息的。进一步,如果在配置文件中硬编码写出从服务器和其他哨兵的信息,会丧失灵活性。

    22.1.2 代码流程

    图22-2 单个哨兵连接示意图

     

    哨兵启动之后会先与配置文件中监控的Master建立两条连接,一条称为命令连接,另一条称为消息连接哨兵就是通过如上两条连接发现其他哨兵和Redis Slave服务器,并且与每个Redis Slave也建立同样的两条连接

    哨兵启动

    哨兵可以直接使用redis-server命令启动,如下:

    redis-server /path/to/sentinel.conf --sentinel

    redis-server中具体的代码流程如下:

    int main(int argc, char **argv) {

    ...

    //检测是否以sentinel模式启动

    server.sentinel_mode = checkForSentinelMode(argc,argv);

    ...

    if (server.sentinel_mode) {

    initSentinelConfig(); // 将监听端口置为26379

    initSentinel(); // 更改哨兵可执行命令。哨兵中只能执行有限的几种服务端命令,如ping,sentinel,subscribe,publish,info等等。该函数还会对哨兵进行一些初始化

    }

    ...

    sentinelIsRunning(); //随机生成一个40字节的哨兵ID,打印启动日志

    }

     

    哨兵的配置文件必须具有可写权限。哨兵的初始配置文件只配置了需要监听的Redis Master和其他一些配置参数,当哨兵发现了其他的Redis Slave服务器和监听同一个Master的其他哨兵时,会将该信息记录到配置文件中做持久化存储。这样,当哨兵重启后,可以直接从退出状态继续执行。

    建立命令连接和消息连接

    主流程只是进行了一些初始化, Redis的时间任务serverCron里面建立命令连接和消息连接。

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    ...

    /* Run the Sentinel timer if we are in sentinel mode. */

    if (server.sentinel_mode) sentinelTimer();

    ...

    }

    void sentinelTimer(void) {

    sentinelCheckTiltCondition();

    sentinelHandleDictOfRedisInstances(sentinel.masters);

    sentinelRunPendingScripts();

    sentinelCollectTerminatedScripts();

    sentinelKillTimedoutScripts();

    /* 不断更改Redis"计时器中断" 的频率,以使每个Sentinel彼此不同步。 这种不确定性避免了哨兵在同一时间开始*继续保持同步,要求一次又一次地在上进行投票(由于没有人投票,导致没有人可能赢得*选举)*/

    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;

    }

     

    serverCron 子函数-> sentinelTimer 函数:

    哨兵中每次执行serverCron时,都会调用sentinelTimer()函数。该函数会建立连接,并且定时发送心跳包并采集信息

    主要功能如下。

    1)建立命令连接和消息连接。消息连接建立之后会订阅Redis服务的__sentinel__:hello频道。

    2)在命令连接上每10s发送info命令进行信息采集;每1s在命令连接上发送ping命令探测存活性;每2s在命令连接上发布一条信息,信息格式如下。

    sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch

    上述参数分别代表哨兵的IP、哨兵的端口、哨兵的ID(即上文所述40字节的随机字符串)、当前纪元(用于选举和主从切换)、Redis Master的名称、Redis Master的IP、Redis Master的端口、RedisMaster的配置纪元(用于选举和主从切换)。

    3)检测服务是否处于主观下线状态

    4)检测服务是否处于客观下线状态并且需要进行主从切换

     

    哨兵启动之后通过info命令进行信息采集,能够知道一个Redis Master有多少Slaves,

    然后在下一次执行sentinelTimer函数时会和所有的Slaves分别建立命令连接与消息连接。

    而通过订阅消息连接上的消息可以知道其他的哨兵。

    哨兵与哨兵之间只会建立一条命令连接,每1s发送一个ping命令进行存活性探测,每2s推送(publish)一条消息。

    第3步中主观下线状态的探测针对所有的Master,Slave和哨兵。

    第4步中只会对Master服务器进行客观下线的判断。如果有大于等于quorum个哨兵同时认为一台Master处于主观下线状态,才会将该Master标记为客观下线。

     

    一个哨兵如何知道其他哨兵对一台Master服务器的判断状态呢?

    Redis会向监控同一台Master的所有哨兵通过命令连接发送如下格式的命令:

    SENTINEL is-master-down-by-addr master_ip master_port current_epoch sentinel_runid或者*

    其中最后一项当需要投票时发送sentinel_runid,否则发送一个*号。据此能够知道其他哨兵对该Master服务状态的判断,如果达到要求,就标记该Master为客观下线。如果判断一个Redis Master处于客观下线状态,这时就需要开始执行主从切换了。

    22.1.3 主从切换

    当Master处于客观下线状态,此时需要执行主从切换。即将其中一个Slave提升为Master,其他Slave从该提升的Slave继续同步数据。主从切换有一个状态迁移图,其所有状态定义如下:

    #define SENTINEL_FAILOVER_STATE_NONE 0 //没有进行切换

    //等待开始进行切换(等待哨兵之间进行选主)

    #define SENTINEL_FAILOVER_STATE_WAIT_START 1

    #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 //选择一台从服务器作为新的主服务器

    //将被选中的从服务器切换为主服务器

    #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3

    #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 //等待被选中的从服务器上报状态

    //将其他Slave切换为向新的主服务器要求同步数据

    #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5

    //重置Master,将Master的IP:PORT设置为被选中从服务器的IP:PORT

    #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6

    图22-3 主从切换状态转换图

    当一个哨兵发现一台Master处于主观下线状态时,会首先将切换状态更新为SEN-TINEL_FAILOVER_STATE_WAIT_START,并且将当前纪元加1。然后发送如下命令要求其他哨兵给自己投票:

    SENTINEL is-master-down-by-addr master_ip master_port current_epoch sentinel_runid或者*

    此时最后一项参数为sentinel_runid,即该哨兵的ID,第5项current_epoch在开始执行切换后会加1。当从哨兵中选出一个主哨兵之后,接下来的切换都由该主哨兵执行。

    主哨兵首先会将当前切换状态更改为SENTINEL_FAILOVER_STATE_SELECT_SLAVE,即开始选择一台

    从服务器作为新的主服务器。那么,假设有多台从服务器,该选择哪台呢?

    Redis中选择规则

    1)如果该Slave处于主观下线状态,则不能被选中。

    2)如果该Slave 5s之内没有有效回复ping命令或者与主服务器断开时间过长,不能被选中。

    3)如果slave-priority为0,则不能被选中(slave-priority可以在配置文件中指定。正整数,值越小优先级越高,当指定为0时,不能被选为主服务器)。

    4)在剩余Slave中比较优先级,优先级高的被选中;如果优先级相同,则有较大复制偏移量的被选中;否则按字母序选择排名靠前的Slave。

     

    选举的slave切换为Master

    当选中从服务器之后,将当前切换状态更改为SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE,并且在下一次时间任务调度时执行该步骤。该状态需要把选择的Redis Slave切换为Redis Master,即哨兵向该Slave发送如下命令:

    MULTI //开启一个事务

    SLAVEOF NO ONE //关闭该从服务器的复制功能,将其转换为一个主服务器

    CONFIG REWRITE //将redis.conf文件重写(会根据当前运行中的配置重写原来的配置)

    //关闭连接到该服务的客户端(关闭之后客户端会重连,重连时会重新获取Redis Master的地址)

    CLIENT KILL TYPE normal

    EXEC //执行事务

     

    执行完该步骤之后会将切换状态更新为SENTINEL_FAILOVER_STATE_WAIT_PROMOTION。上一步我们向被选中的从服务器发送了slaveof no one命令,执行完之后Redis中并没有处理返回值,而是在下一次info命令的返回中检查该从服务器的role字段,如果返回role:master,说明该从服务器已变更自己的角色为主服务器。于是切换状态变更为SENTINEL_FAILOVER_STATE_RECONF_SLAVES。

    注意  在该步骤变更状态为SENTINEL_FAILOVER_STATE_RECONF_SLAVES之前,如果切换超时,哨兵可以放弃本次切换,放弃之后会从第一步开始重新执行切换。但是如果进行到该步骤,则只能继续执行,不会检测超时。

     

    在该步骤设置SENTINEL_FAILOVER_STATE_RECONF_SLAVES后,哨兵会依次向其他从服务器发送切换主服务器的命令,如下:

    MULTI //开启一个事务

    SLAVEOF IP PORT //将该服务器设置为向新的主服务器请求数据

    CONFIG REWRITE //将redis.conf文件重写(会根据当前运行中的配置重写原来的配置)

    //关闭连接到该服务的客户端(关闭之后客户端会重连,重连时会重新获取Redis Master的地址)

    CLIENT KILL TYPE normal

    EXEC //执行事务

     

    如果所有的从服务器都已更新完毕,则切换状态更新为SENTINEL_FAILOVER_STATE_UPDATE_CONFIG。该步骤会将哨兵中监听的Master(旧Master)重置为被选中的从服务器(新Master),并且将旧Master也配置为新Master的从服务器。然后将切换状态更新为SENTINEL_FAILOVER_STATE_NONE。

    至此,主从切换已完成。

    22.1.4 常用命令

    在22.1.2节,初始化哨兵时会调用initSentinel函数,该函数中会更改哨兵可执行的命令,具体如下:

    struct redisCommand sentinelcmds[] = {

    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},

    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},

    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},

    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},

    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},

    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},

    {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},

    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},

    {"role",sentinelRoleCommand,1,"ok-loading",0,NULL,0,0,0,0,0},

    {"client",clientCommand,-2,"read-only no-script",0,NULL,0,0,0,0,0},

    {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},

    {"auth",authCommand,2,"no-auth no-script ok-loading ok-stale fast",0,NULL,0,0,0,0,0},

    {"hello",helloCommand,-2,"no-auth no-script fast",0,NULL,0,0,0,0,0}

    };

    哨兵中只可以执行有限的几种命令。本节主要介绍哨兵中独有的命令:sentinel。类似其他命令的执行流程,该命令会调用sentinelCommand函数。该命令的几种重点形式。

    1)sentinel masters:返回该哨兵监控的所有Master的相关信息。

    2)SENTINEL MASTER<name>:返回指定名称Master的相关信息。

    3)SENTINEL SLAVES<master-name>:返回指定名称Master的所有Slave的相关信息。

    4)SENTINEL SENTINELS<master-name>:返回指定名称Master的所有哨兵的相关信息。

    5)SENTINEL IS-MASTER-DOWN-BY-ADDR<ip><port><current-epoch><runid>:如果runid是*,返回由IP和Port指定的Master是否处于主观下线状态。如果runid是某个哨兵的ID,则同时会要求对该runid进行选举投票。

    6)SENTINEL RESET<pattern>:重置所有该哨兵监控的匹配模式(pattern)的Masters(刷新状态,重新建立各类连接)。

    7)SENTINEL GET-MASTER-ADDR-BY-NAME<master-name>:返回指定名称的Master对应的IP和Port。

    8)SENTINEL FAILOVER<master-name>:对指定的Mmaster手动强制执行一次切换。

    9)SENTINEL MONITOR<name><ip><port><quorum>:指定该哨兵监听一个Master。

    10)SENTINEL flushconfig:将配置文件刷新到磁盘。

    11)SENTINEL REMOVE<name>:从监控中去除掉指定名称的Master。

    12)SENTINEL CKQUORUM<name>:根据可用哨兵数量,计算哨兵可用数量是否满足配置数量(认定客观下线的数量);是否满足切换数量(即哨兵数量的一半以上)。

    13)SENTINEL SET<mastername>[<option><value>...]:设置指定名称的Master的各类参数(例如超时时间等)。

    14)SENTINEL SIMULATE-FAILURE<flag><flag>...<flag>:模拟崩溃。flag可以为crash-after-election或者crash-after-promotion,分别代表切换时选举完成主哨兵之后崩溃以及将被选中的从服务器推举为Master之后崩溃。

    小结

    本节通过一个常见的哨兵部署方案介绍哨兵的主要功能,

    然后通过哨兵启动过程的追踪和主从切换的过程介绍了哨兵在Redis中具体的实现逻辑。

    最后介绍了哨兵中sentinel相关的常用命令。

     

    通过本节介绍,我们回答一下之前提出的3个问题。

    1)主从切换完成之后,客户端和其他哨兵如何知道现在提供服务的RedisMaster是哪一个呢?

    回答 :可以通过subscribe__sentinel__:hello频道,知道当前提供服务的Master的IP和Port。

    2)执行切换的哨兵发生了故障,切换操作是否会由其他哨兵继续完成呢?

    回答 :执行切换的哨兵发生故障后,剩余哨兵会重新选主,并且重新开始执行切换流程

    3)故障Master恢复之后,会继续作为Master提供服务还是会作为Slave提供服务?

    回答 :Redis中主从切换完成之后,当故障Master恢复之后,会作为新Master的一个Slave来提供服务。

    22.2 集群

    图22-4 集群部署方式

    图中有3个Redis Master,每个Redis Master挂载一个Redis Slave,共6个Redis实例。

    集群用来提供横向扩展能力,即当数据量增多之后,通过增加服务节点就可以扩展服务能力。背后理论思想是将数据通过某种算法分布到不同的服务节点,这样当节点越多,单台节点所需提供服务的数据就越少。

    集群首先需要解决如下问题。

    1)分槽(slot):即如何决定某条数据应该由哪个节点提供服务;

    2)端如何向集群发起请求(客户端并不知道某个数据应该由哪个节点提供服务,并且如果扩容或者节点发生故障后,不应该影响客户端的访问)?

    3)某个节点发生故障之后,该节点服务的数据该如何处理?

    4)扩容,即向集群中添加新节点该如何操作?

    5)同一条命令需要处理的key分布在不同的节点中(如Redis中集合取并集、交集的相关命令),如何操作?

    22.2.1 集群简介

    Redis将键空间分为了16384个slot,然后通过如下算法:

    HASH_SLOT = CRC16(key) mod 16384

    计算出每个key所属的slot。客户端可以请求任意一个节点,每个节点中都会保存所有16384个slot对应到哪一个节点的信息。如果一个key所属的slot正好由被请求的节点提供服务,则直接处理并返回结果,否则返回MOVED重定向信息,如下:

    GET key

    -MOVED slot IP:PORT

    由-MOVED开头,接着是该key计算出的slot,然后是该slot对应到的节点IPPort。客户端应该处理该重定向信息,并且向拥有该key的节点发起请求。

    实际应用中,Redis客户端可以通过向集群请求slot和节点的映射关系并缓存,然后通过本地计算要操作的key所属的slot,查询映射关系,直接向正确的节点发起请求,这样可以获得几乎等价于单节点部署的性能。

    当集群由于节点故障或者扩容导致重新分片后,客户端先通过重定向获取到数据,每次发生重定向后,客户端可以将新的映射关系进行缓存,下次仍然可以直接向正确的节点发起请求。

    接着考虑图22-4,集群中的数据分片之后由不同的节点提供服务,即每个主节点的数据都不相同,此种情况下,为了确保没有单点故障,主服务必须挂载至少一个从服务。客户端请求时可以向任意一个主节点或者从节点发起,当向从节点发起请求时,从节点会返回MOVED信息重定向到相应的主节点。

     

    注意  Redis集群中,客户端只能在主节点执行读写操作

    如果需要在从节点中进行读操作,需要满足如下条件:

    ①首先在客户端中执行readonly命令;

    ②如果一个key所属的slot由主节点A提供服务,则请求该key时可以向A所属的从节点发起读请求。该请求不会被重定向。当一个主节点发生故障后,其挂载的从节点会切换为主节点继续提供服务。

    最后,当一条命令需要操作的key分属于不同的节点时,Redis会报错。Redis提供了一种称为hash tags的机制,由业务方保证当需要进行多个key的处理时,将所有key分布到同一个节点,

    该机制实现原理如下:

    如果一个key包括{substring}这种模式,则计算slot时只计算"{"和"}"之间的子字符串。即keys{sub}1、keys{sub}2、keys{sub}3计算slot时都会按照sub串进行。这样保证这3个字符串会分布到同一个节点。

    22.2.2 代码流程

    首先看一份典型的Redis集群配置:

    port 7000 //监听端口

    cluster-enabled yes //是否开启集群模式

    cluster-config-file nodes7000.conf //集群中该节点的配置文件

    cluster-node-timeout 5000 //节点超时时间,超过该时间之后会认为处于故障状态

    daemonize yes

    说明:

    7000端口用来处理客户端请求,除了7000端口,

    Redis集群中每个节点会起一个新的端口(默认为监听端口加10000,本例中为17000)用来和集群中其他节点进行通信

    cluster-config-file指定的配置文件需要有可写权限,用来持久化当前节点状态。节点可以直接使用redis-server命令启动,如下:

    redis-server /path/to/redis-cluster.conf

    redis-server启动

    执行该条命令后,redis-server中具体的代码流程如下:

    main(){

    ...

    if (server.cluster_enabled) clusterInit();

    ...

    }

    clusterInit集群初始化

    clusterInit函数会加载配置并且初始化一些状态指标,监听集群通信端口。除此之外,该函数执行一些回调函数的注册

    void clusterInit(void) {

    int saveconf = 0;

    server.cluster = zmalloc(sizeof(clusterState));

    server.cluster->myself = NULL;

    server.cluster->currentEpoch = 0;

    server.cluster->state = CLUSTER_FAIL;

    server.cluster->size = 1;

    server.cluster->todo_before_sleep = 0;

    server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);

    server.cluster->nodes_black_list =

    dictCreate(&clusterNodesBlackListDictType,NULL);

    server.cluster->failover_auth_time = 0;

    server.cluster->failover_auth_count = 0;

    server.cluster->failover_auth_rank = 0;

    server.cluster->failover_auth_epoch = 0;

    server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;

    server.cluster->lastVoteEpoch = 0;

    for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {

    server.cluster->stats_bus_messages_sent[i] = 0;

    server.cluster->stats_bus_messages_received[i] = 0;

    }

    server.cluster->stats_pfail_nodes = 0;

    memset(server.cluster->slots,0, sizeof(server.cluster->slots));

    clusterCloseAllSlots();

     

    /* 锁定集群配置文件,以确保每个节点都使用*自己的nodes.conf. */

    if (clusterLockConfig(server.cluster_configfile) == C_ERR)

    exit(1);

     

    /* 加载或创建新的节点配置. */

    if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {

    myself = server.cluster->myself =

    createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);

    serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",

    myself->name);

    clusterAddNode(myself);

    saveconf = 1;

    }

    if (saveconf) clusterSaveConfigOrDie(1);

    /* We need a listening TCP port for our cluster messaging needs. */

    server.cfd_count = 0;

    /*端口号大小限制 */

    int port = server.tls_cluster ? server.tls_port : server.port;

    if (port > (65535-CLUSTER_PORT_INCR)) {

    exit(1);

    }

    if (listenToPort(port+CLUSTER_PORT_INCR,

    server.cfd,&server.cfd_count) == C_ERR)

    {

    exit(1);

    } else {

    int j;

    // 1)集群通信端口建立监听后,注册回调函数clusterAcceptHandler。当节点之间建立连接时先由该函数进行处理。

    2)当节点之间建立连接后,为新建立的连接注册读事件的回调函数clusterReadHandler。

    3)当有读事件发生时,当clusterReadHandler读取到一个完整的包体后,调用clusterProcessPacket解析具体的包体。22.2.6节介绍的集群之间通信数据包的解析都在该函数内完成。

    for (j = 0; j < server.cfd_count; j++) {

    if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,

    clusterAcceptHandler, NULL) == AE_ERR)

    serverPanic("Unrecoverable error creating Redis Cluster " "file event.");

    }

    }

    /* The slots -> keys map is a radix tree. Initialize it here. */

    server.cluster->slots_to_keys = raxNew();

    memset(server.cluster->slots_keys_count,0,

    sizeof(server.cluster->slots_keys_count));

    /* Set myself->port / cport to my listening ports, we'll just need to

    * discover the IP address via MEET messages. */

    myself->port = port;

    myself->cport = port+CLUSTER_PORT_INCR;

    if (server.cluster_announce_port)

    myself->port = server.cluster_announce_port;

    if (server.cluster_announce_bus_port)

    myself->cport = server.cluster_announce_bus_port;

    server.cluster->mf_end = 0;

    resetManualFailover();

    clusterUpdateMyselfFlags();

    }

    时间任务函数serverCron

    类似哨兵,Redis时间任务函数serverCron中会调度集群的周期性函数,如下:

    serverCron(){

    /* Run the Redis Cluster cron. */

    run_with_period(100) {

    if (server.cluster_enabled) clusterCron();

    }

    }

    clusterCron函数

    执行操作:

    1)向其他节点发送MEET消息,将其加入集群。

    注意 当在一个集群节点A执行CLUSTER MEET ip port命令时,会将"ip:port"指定的节点B加入该集群中。但该命令执行时只是将B的"ip:port"信息保存到A节点中,然后在clusterCron函数中为A节点和"ip:port"指定的B节点建立连接并发送MEET类型的数据包。MEET数据包格式见22.2.6下的第3节。

    2)每1s会随机选择一个节点,发送ping消息(消息内容详情见22.2.6节下的第1节关于ping包的介绍)。

    3)如果一个节点在超时时间之内仍未收到ping包的响应(cluster-node-timeout配置项指定的时间),则将其标记为pfail。

    注意 Redis集群中节点的故障状态有两种。一种为pfail(Possible failure),当一个节点A未在指定时间收到另一个节点B对ping包的响应时,A节点会将B节点标记为pfail。另一种是,当大多数Master节点确认B为pfail之后,就会将B标记为fail。fail状态的节点才会需要执行主从切换。

    4)检查是否需要进行主从切换,如果需要则执行切换(见22.2.3节)。

    5)检查是否需要进行副本漂移,如果需要,执行副本漂移操作(见22.2.4节)。

    /* This is executed 10 times every second */

    void clusterCron(void) {

    dictIterator *di;

    dictEntry *de;

    int update_state = 0;

    int orphaned_masters; /* How many masters there are without ok slaves. */

    int max_slaves; /* Max number of ok slaves for a single master. */

    int this_slaves; /* Number of ok slaves for our master (if we are slave). */

    mstime_t min_pong = 0, now = mstime();

    clusterNode *min_pong_node = NULL;

    static unsigned long long iteration = 0;

    mstime_t handshake_timeout;

    iteration++; /* Number of times this function was called so far. */

    {

    static char *prev_ip = NULL;

    char *curr_ip = server.cluster_announce_ip;

    int changed = 0;

    if (prev_ip == NULL && curr_ip != NULL) changed = 1;

    else if (prev_ip != NULL && curr_ip == NULL) changed = 1;

    else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;

    if (changed) {

    if (prev_ip) zfree(prev_ip);

    prev_ip = curr_ip;

    if (curr_ip) {

    prev_ip = zstrdup(prev_ip);

    strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);

    myself->ip[NET_IP_STR_LEN-1] = '';

    } else {

    myself->ip[0] = ''; /* Force autodetection. */

    }

    }

    }

    handshake_timeout = server.cluster_node_timeout;

    if (handshake_timeout < 1000) handshake_timeout = 1000;

    /* Update myself flags. */

    clusterUpdateMyselfFlags();

    di = dictGetSafeIterator(server.cluster->nodes);

    server.cluster->stats_pfail_nodes = 0;

    while((de = dictNext(di)) != NULL) {

    clusterNode *node = dictGetVal(de);

    /* Not interested in reconnecting the link with myself or nodes

    * for which we have no address. */

    if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;

    if (node->flags & CLUSTER_NODE_PFAIL)

    server.cluster->stats_pfail_nodes++;

    /* A Node in HANDSHAKE state has a limited lifespan equal to the

    * configured node timeout. */

    if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {

    clusterDelNode(node);

    continue;

    }

    if (node->link == NULL) {

    clusterLink *link = createClusterLink(node);

    link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();

    connSetPrivateData(link->conn, link);

    if (connConnect(link->conn, node->ip, node->cport, NET_FIRST_BIND_ADDR,

    clusterLinkConnectHandler) == -1) {

     

    if (node->ping_sent == 0) node->ping_sent = mstime();

    serverLog(LL_DEBUG, "Unable to connect to "

    "Cluster Node [%s]:%d -> %s", node->ip,

    node->cport, server.neterr);

    freeClusterLink(link);

    continue;

    }

    node->link = link;

    }

    }

    dictReleaseIterator(di);

    /* 每1s会随机选择一个节点,发送ping消息(消息内容详情见22.2.6节下的第1节关于ping包的介绍)。. */

    if (!(iteration % 10)) {

    int j;

    /* Check a few random nodes and ping the one with the oldest

    * pong_received time. */

    for (j = 0; j < 5; j++) {

    de = dictGetRandomKey(server.cluster->nodes);

    clusterNode *this = dictGetVal(de);

    /* Don't ping nodes disconnected or with a ping currently active. */

    if (this->link == NULL || this->ping_sent != 0) continue;

    if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))

    continue;

    if (min_pong_node == NULL || min_pong > this->pong_received) {

    min_pong_node = this;

    min_pong = this->pong_received;

    }

    }

    if (min_pong_node) {

    serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);

    clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);

    }

    }

    /* 迭代节点以检查是否需要将某些内容标记为失败。 此循环还负责:

    1) 检查是否存在孤立的主控(无故障的主控从属)。

    2)计算单个主机的最大非故障从机数。

    3)如果我们是slave,为master计算slave的数量。*/

    orphaned_masters = 0;

    max_slaves = 0;

    this_slaves = 0;

    di = dictGetSafeIterator(server.cluster->nodes);

    while((de = dictNext(di)) != NULL) {

    clusterNode *node = dictGetVal(de);

    now = mstime(); /* Use an updated time at every iteration. */

    if (node->flags &

    (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))

    continue;

    /* Orphaned master check, useful only if the current instance

    * is a slave that may migrate to another master. */

    if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {

    int okslaves = clusterCountNonFailingSlaves(node);

    /* A master is orphaned if it is serving a non-zero number of

    * slots, have no working slaves, but used to have at least one

    * slave, or failed over a master that used to have slaves. */

    if (okslaves == 0 && node->numslots > 0 &&

    node->flags & CLUSTER_NODE_MIGRATE_TO)

    {

    orphaned_masters++;

    }

    if (okslaves > max_slaves) max_slaves = okslaves;

    if (nodeIsSlave(myself) && myself->slaveof == node)

    this_slaves = okslaves;

    }

    mstime_t ping_delay = now - node->ping_sent;

    mstime_t data_delay = now - node->data_received;

    if (node->link && /* is connected */

    now - node->link->ctime >

    server.cluster_node_timeout && /* was not already reconnected */

    node->ping_sent && /* we already sent a ping */

    node->pong_received < node->ping_sent && /* still waiting pong */

    /* and we are waiting for the pong more than timeout/2 */

    ping_delay > server.cluster_node_timeout/2 &&

    /* and in such interval we are not seeing any traffic at all. */

    data_delay > server.cluster_node_timeout/2)

    {

    /* Disconnect the link, it will be reconnected automatically. */

    freeClusterLink(node->link);

    }

     

    /* 如果当前我们在此实例中没有活动的ping,并且收到的PONG早于集群超时的一半,立即发送一个新的ping,以确保所有节点都被ping而不造成太大的延迟. */

    if (node->link &&

    node->ping_sent == 0 &&

    (now - node->pong_received) > server.cluster_node_timeout/2)

    {

    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);

    continue;

    }

     

    /* 我们是主服务器,而其中一个从服务器则请求进行手动故障转移,对其进行连续ping操作*/

    if (server.cluster->mf_end &&

    nodeIsMaster(myself) &&

    server.cluster->mf_slave == node &&

    node->link)

    {

    clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);

    continue;

    }

     

    /* Check only if we have an active ping for this instance. */

    if (node->ping_sent == 0) continue;

     

    /* 检查此节点是否看起来无法访问。 请注意,如果我们已经收到PONG,则node-> ping_sent 为零,因此根本无法到达此代码,因此,如果没有发送PING,我们就没有冒着检查PONG延迟的风险。 。 我们还将所有传入的数据视为活动的证明,因为我们的集群总线链接也用于数据:在繁重的数据下可能会有负载pong延迟。*/

    mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :

    data_delay;

     

    if (node_delay > server.cluster_node_timeout) {

    /* Timeout reached. Set the node as possibly failing if it is

    * not already in this state. */

    if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {

    serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",

    node->name);

    node->flags |= CLUSTER_NODE_PFAIL;

    update_state = 1;

    }

    }

    }

    dictReleaseIterator(di);

    if (nodeIsSlave(myself) &&

    server.masterhost == NULL &&

    myself->slaveof &&

    nodeHasAddr(myself->slaveof))

    {

    replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);

    }

     

    /* Abourt a manual failover if the timeout is reached. */

    manualFailoverCheckTimeout();

     

    if (nodeIsSlave(myself)) { //如果自己是从节点, 集群处理手动故障转移

    clusterHandleManualFailover(); //手动故障转移相关

    if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))

    clusterHandleSlaveFailover(); // 故障转移开始

    //有孤立主节点, 最大从节点大于2 , 这个节点的节点数为最大

    if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)

    clusterHandleSlaveMigration(max_slaves); //集群处理副本漂移

    }

     

    if (update_state || server.cluster->state == CLUSTER_FAIL)

    clusterUpdateState();

    }

    beforeSleep函数

    Redis除了在serverCron函数中进行调度之外,在每次进入事件循环之前,会在beforeSleep函数中执行一些操作,如下:

    void beforeSleep(struct aeEventLoop *eventLoop) {

    ...

    if (server.cluster_enabled) clusterBeforeSleep();

    ...

    }

    void clusterBeforeSleep(void) {

    /* 1)检查主从切换状态,如果需要,执行主从切换相关操作 */

    if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)

    clusterHandleSlaveFailover();

    /*2)更新集群状态,通过检查是否所有slot都有相应的节点提供服务以及是否大部分主服务都是可用状态,来决定集群处于正常状态还是失败状态。 */

    if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)

    clusterUpdateState();

    /* 3)刷新集群状态到配置文件。. */

    if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {

    int fsync = server.cluster->todo_before_sleep &

    CLUSTER_TODO_FSYNC_CONFIG;

    clusterSaveConfigOrDie(fsync);

    }

    /* 清除标志 */

    server.cluster->todo_before_sleep = 0;

    }

    小结

    clusterCron和clusterBeforeSleep函数中都会进行主从切换相关状态的判断,如果需要进行主从切换,还会进行切换相关的操作。

    22.2.3 主从切换

    集群中节点有两种失败状态:pfail和fail。当集群中节点通过错误检测机制发现某个节点处于fail状态时,会自动执行主从切换。

    Redis中还提供一种手动执行切换的方法,即通过执行clusterfailover命令。

    注意  通过手动切换方式能实现Redis主节点的平滑升级,

    具体步骤是:先将主节点切换为一个从节点,然后进行版本的升级,再将升级后的版本切换回主节点。该过程不会有任务服务的中断,具体参见22.2.3节的第2节的介绍

    1.自动切换

    集群之间会互相发送心跳包,心跳包中会包括从发送方视角所记录的关于其他节点的状态信息。当一个节点收到心跳包之后,如果检测到发送方(假设为A)标记某个节点(假设为B)处于pfail状态,则接收节点(假设为C)会检测B是否已经被大多数主节点标记为pfail状态。如果是,则C节点会向集群中所有节点发送一个fail包(见22.2.6节的第4节),通知其他节点B已经处于fail状态。

    判断步骤

    当一个主节点(假设为B)被标记为fail状态后

    该主节点的所有Slave执行周期性函数clusterCron时,

    会从所有的Slave中选择一个复制偏移量最大的Slave节点(即数据最新的从节点,假设为D),

    然后D节点首先将其当前纪元(currentEpoch)加1,

    然后向所有的主节点发送failover授权请求包(见22.2.6节的第6节),

    当获得大多数主节点的授权后,开始执行主从切换。

    注意  

    ①currentEpoch:集群当前纪元,类似Raft算法中的term,是一个递增的版本号。正常状态下集群中所有节点的currentEpoch相同。每次选举时从节点首先将currentEpoch加1,然后进行选举。投票时同一对主从的同一个currentEpoch只能投一次,防止多个Slave同时发起选举后难以获得票的大多数。注意currentEpoch为所有Master节点中配置纪元的最大值。

    ②configEpoch:每个主节点的配置纪元。当因为网络分区导致多个节点提供冲突的信息时,通过configEpoch能够知道哪个节点的信息最新。

    切换流程

    (假设被切换的主节点为M,执行切换的从节点为S)。

    1)S先更新自己的状态,将自己声明为主节点。并且将S从M中移除。

    2)由于S需要切换为主节点,所以将S的同步数据相关信息清除(即不再从M同步数据)。

    3)将M提供服务的slot都声明到S中。

    4)发送一个PONG包,通知集群中其他节点更新状态。

     

    2.手动切换

    流程

    当一个从节点接收到cluster failover命令之后,执行手动切换,流程如下。

    1)该从节点首先向对应的主节点发送一个mfstart包(见22.2.6的第8节)。通知主节点从节点要开始进行手动切换。

    2)主节点会阻塞所有客户端命令的执行。之后主节点在周期性函数clusterCron中发送ping包时会在包头部分做特殊标记。

    提示  redisServer结构体中有两个字段,clients_paused和clients_pause_end_time,当需要阻塞所有客户端命令的执行时,首先将clients_paused置为1,然后将clients_pause_end_time设置为当前时间加2倍的CLUSTER_MF_TIMEOUT(默认值为5s)。客户端发起命令请求时会调用processInputBuffer,该函数会检测当前是否处于客户端阻塞状态,如果是,则不会继续执行命令。

    3)当从节点收到主节点的ping包并且检测到特殊标记之后,会从包头中获取主节点的复制偏移量。

    4)从节点在周期性函数clusterCron中检测当前处理的偏移量与主节点复制偏移量是否相等,当相等时开始执行切换流程

    5)切换完成后,主节点会将阻塞的所有客户端命令通过发送+MOVED指令重定向到新的主节点。

    手动执行主从切换时不会丢失任何数据,也不会丢失任何执行命令,只在切换过程中会有暂时的停顿。具体切换流程同自动切换。

     

    3.从节点发起故障转移,开始拉票

    /* 在集群定时器函数clusterCron中调用。本函数用于处理从节点进行故障转移的整个流程,

    包括:判断是否可以发起选举;发起选举;判断选举是否超时;判断自己是否拉到了足够的选票;使自己升级为新的主节点这些所有流程。 */

    void clusterHandleSlaveFailover(void) {

    mstime_t data_age;

    // server.cluster->failover_auth_time属性,表示从节点可以开始进行故障转移的时间。集群初始化时该属性置为0,一旦满足开始故障转移的条件后,该属性就置为未来的某个时间点,在该时间点,从节点才开始进行拉票。

    //首先计算auth_age,该变量表示距离发起故障转移流程,已经过去了多少时间;

    mstime_t auth_age = mstime() - server.cluster->failover_auth_time;

    //计算needed_quorum,该变量表示当前从节点必须至少获得多少选票,才能成为新的主节点;

    int needed_quorum = (server.cluster->size / 2) + 1;

    //manual_failover表示是否是管理员手动触发的故障转移流程;

    int manual_failover = server.cluster->mf_end != 0 &&

    server.cluster->mf_can_start;

    mstime_t auth_timeout, auth_retry_time;

     

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

     

    //计算auth_timeout,该变量表示故障转移流程(发起投票,等待回应)的超时时间,超过该时间后还没有获得足够的选票,则表示本次故障转移失败;

    auth_timeout = server.cluster_node_timeout*2;

    if (auth_timeout < 2000) auth_timeout = 2000;

    //计算auth_retry_time,该变量表示判断是否可以开始下一次故障转移流程的时间,只有距离上一次发起故障转移时,已经超过auth_retry_time之后,才表示可以开始下一次故障转移了(auth_age > auth_retry_time);

    auth_retry_time = auth_timeout*2;

     

    /*接下来判断当前节点是否可以进行故障转移:

    当前节点是主节点;当前节点是从节点但是没有主节点;当前节点的主节点不处于下线状态并且不是手动强制进行故障转移;当前节点的主节点没有负责的槽位。

    满足以上任一条件,则不能进行故障转移,直接返回即可; */

    if (nodeIsMaster(myself) ||

    myself->slaveof == NULL ||

    (!nodeFailed(myself->slaveof) && !manual_failover) ||

    (server.cluster_slave_no_failover && !manual_failover) ||

    myself->slaveof->numslots == 0)

    {

    server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;

    return;

    }

     

    /* 计算现在距离当前从节点与主节点最后交互的时间data_age, 如果data_age大于server.cluster_node_timeout,则从data_age中减去server.cluster_node_timeout,因为经过server.cluster_node_timeout时间没有收到主节点的PING回复,才会将其标记为PFAIL,因此data_age实际上表示:在主节点下线之前,当前从节点有多长时间没有与其交互过了。data_age主要用于判断当前从节点的数据新鲜度;如果data_age超过了一定时间,表示当前从节点的数据已经太老了,不能替换掉下线主节点,因此在不是手动强制故障转移的情况下,直接返回; */

    if (server.repl_state == REPL_STATE_CONNECTED) {

    data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)

    * 1000;

    } else {

    data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;

    }

    if (data_age > server.cluster_node_timeout)

    data_age -= server.cluster_node_timeout;

     

    if (server.cluster_slave_validity_factor &&

    data_age >

    (((mstime_t)server.repl_ping_slave_period * 1000) +

    (server.cluster_node_timeout * server.cluster_slave_validity_factor)))

    {

    if (!manual_failover) {

    clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);

    return;

    }

    }

     

    /* 如果auth_age大于auth_retry_time,表示可以开始进行下一次故障转移了。如果之前没有进行过故障转移,则auth_age等于mstime,肯定大于auth_retry_time;如果之前进行过故障转移,则只有距离上一次发起故障转移时,已经超过auth_retry_time之后,才表示可以开始下一次故障转移。满足该条件后,设置故障转移流程的开始时间:server.cluster->failover_auth_time为mstime() + 500 +random()%500 + rank*1000, */

    if (auth_age > auth_retry_time) {

    server.cluster->failover_auth_time = mstime() +

    500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */

    random() % 500; /* Random delay between 0 and 500 milliseconds. */

    server.cluster->failover_auth_count = 0;

    server.cluster->failover_auth_sent = 0;

    server.cluster->failover_auth_rank = clusterGetSlaveRank();

    /* We add another delay that is proportional to the slave rank.

    * Specifically 1 second * rank. This way slaves that have a probably

    * less updated replication offset, are penalized. */

    server.cluster->failover_auth_time +=

    server.cluster->failover_auth_rank * 1000;

    /* 手动转移,如有必要不延迟.设置server.cluster->failover_auth_time为当前时间,表示会立即开始故障转移流程; */

    if (server.cluster->mf_end) {

    server.cluster->failover_auth_time = mstime();

    server.cluster->failover_auth_rank = 0;

    clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);

    }

    serverLog(LL_WARNING,

    "Start of election delayed for %lld milliseconds "

    "(rank #%d, offset %lld).",

    server.cluster->failover_auth_time - mstime(),

    server.cluster->failover_auth_rank,

    replicationGetSlaveOffset());

    /* 最后,调用clusterBroadcastPong,向该下线主节点的所有从节点发送PONG包,包头部分带有当前从节点的复制数据量,因此其他从节点收到之后,可以更新自己的排名;最后直接返回; */

    clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);

    return;

    }

     

    /* 如果还没有开始故障转移,则调用clusterGetSlaveRank,取得当前从节点的最新排名。因为在开始故障转移之前,可能会收到其他从节点发来的心跳包,因而可以根据心跳包中的复制偏移量更新本节点的排名,获得新排名newrank,如果newrank比之前的排名靠后,则需要增加故障转移开始时间的延迟,然后将newrank记录到server.cluster->failover_auth_rank中;. */

    if (server.cluster->failover_auth_sent == 0 &&

    server.cluster->mf_end == 0)

    {

    int newrank = clusterGetSlaveRank();

    if (newrank > server.cluster->failover_auth_rank) {

    long long added_delay =

    (newrank - server.cluster->failover_auth_rank) * 1000;

    server.cluster->failover_auth_time += added_delay;

    server.cluster->failover_auth_rank = newrank;

    serverLog(LL_WARNING,

    "Replica rank updated to #%d, added %lld milliseconds of delay.",

    newrank, added_delay);

    }

    }

     

    /* 如果当前时间还不到开始故障转移的时候,则直接返回即可;*/

    if (mstime() < server.cluster->failover_auth_time) {

    clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);

    return;

    }

     

    /* 如果auth_age大于auth_timeout,说明之前的故障转移超时了,因此直接返回;. */

    if (auth_age > auth_timeout) {

    clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);

    return;

    }

     

    /* 走到这里,说明可以开始故障转移了。因此,首先增加当前节点的currentEpoch的值,表示要开始新一轮选举了。此时该从节点的currentEpoch就是所有集群节点中最大的;然后将该currentEpoch记录到server.cluster->failover_auth_epoch中;. */

    if (server.cluster->failover_auth_sent == 0) {

    server.cluster->currentEpoch++;

    server.cluster->failover_auth_epoch = server.cluster->currentEpoch;

    serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",

    (unsigned long long) server.cluster->currentEpoch);

    //调用clusterRequestFailoverAuth,向所有集群节点发送CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST包用于拉票;

    clusterRequestFailoverAuth();

    //置标记,表示已发起了故障转移流程;最后直接返回;

    server.cluster->failover_auth_sent = 1;

    clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|

    CLUSTER_TODO_UPDATE_STATE|

    CLUSTER_TODO_FSYNC_CONFIG);

    return; /* Wait for replies. */

    }

     

    /* 检查: 当前从节点已经受到了大部分节点的支持,可以成为新的主节点了 */

    if (server.cluster->failover_auth_count >= needed_quorum) {

    serverLog(LL_WARNING,

    "Failover election won: I'm the new master.");

    /* 首先更新myself->configEpoch为server.cluster->failover_auth_epoch,这样当前节点的configEpoch就成为所有集群节点中最大的了,方便后续更新配置。这种产生新configEpoch的方式是经过协商过的,因为只有从节点赢得大部分主节点投票的时候,才会产生新的configEpoch;. */

    if (myself->configEpoch < server.cluster->failover_auth_epoch) {

    myself->configEpoch = server.cluster->failover_auth_epoch;

    serverLog(LL_WARNING,

    "configEpoch set to %llu after successful failover",

    (unsigned long long) myself->configEpoch);

    }

     

    /* 取代下线主节点,成为新的主节点,并向其他节点广播这种变化。 */

    clusterFailoverReplaceYourMaster();

    } else {

    clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);

    }

    }

     

    22.2.4 副本漂移

    考虑图22-4所示的部署方式,集群中共有6个实例,三主三从,每对主从只能有一个处于故障状态。假设一对主从同时发生故障,则集群中的某些slot会处于不能提供服务的状态,从而导致集群失效。为了提高可靠性,我们可以在每个主服务下边各挂载两个从服务实例。在图22-4所示实例中,共需要增加3个实例。但假设若集群中有100个主服务,为了更高的可靠性,就需要增加100个实例。有什么方法既能提高可靠性,又可以做到不随集群规模线性增加从服务实例的数量呢?

    Redis中提供了一种副本漂移的方法,如图22-5所示。

    图22-5 集群副本漂移

    我们只给其中一个主C增加两个从服务。假设主A发生故障,主A的从A1会执行切换,切换完成之后从A1变为主A1,此时主A1会出现单点问题。当检测到该单点问题后,集群会主动从主C的从服务中漂移一个给有单点问题的主A1做从服务,如图22-6所示。

    我们详细介绍Redis中如何实现副本漂移。

    图22-6 集群副本漂移

    在周期性调度函数clusterCron中会定期检测如下条件:

    1)是否存在单点的主节点,即主节点没有任何一台可用的从节点;(谁少了从节点)

    2)是否存在有两台及以上可用从节点的主节点。(谁有多的从节点)

    如果以上两个条件都满足,则从有最多可用从节点的主节点中选择一台从节点执行副本漂移。选择标准为按节点名称的字母序从小到大,选择最靠前的一台从节点执行漂移。

     

    漂移具体过程如下(按图22-6名称做说明):

    1)从C的记录中将C1移除;

    2)将C1所记录的主节点更改为A1;(A1是原主节点A的从节点)

    3)在A1中添加C1为从节点;

    4)将C1的数据同步源设置为A1。

    漂移过程只更改一些节点所记录的信息,之后会通过心跳包将该信息同步到所有的集群节点。

    clusterHandleSlaveMigration函数中经过一些校验检查,最后调用clusterSetMaster;

    void clusterSetMaster(clusterNode *n) {

    if (nodeIsMaster(myself)) {

    myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);

    myself->flags |= CLUSTER_NODE_SLAVE;

    clusterCloseAllSlots();

    } else {

    if (myself->slaveof) //如果myself有主节点,则从主节点移除myself

    clusterNodeRemoveSlave(myself->slaveof,myself);

    }

    myself->slaveof = n; //myself主节点设置为n

    clusterNodeAddSlave(n,myself); //n从节点添加myself

    replicationSetMaster(n->ip, n->port); //myself同步数据源为n

    resetManualFailover();

    }

    22.2.5 分片(slot)迁移

    有很多情况下需要进行分片的迁移,例如增加一个新节点之后需要把一些分片迁移到新节点,或者当删除一个节点之后,需要将该节点提供服务的分片迁移到其他节点,甚至有些时候需要根据负载重新配置分片的分布。

    Redis集群中分片的迁移,即slot的迁移,需要将一个slot中所有的key从一个节点迁移到另一个节点。

    我们通过如下一些Redis的命令看看具体实现(假设以下命令都在节点A执行)。

    1. CLUSTER ADDSLOTS slot1[slot2]...[slotN]:

      在A节点中增加指定的slot(即指定的slot由A提供服务)。注意,如果指定的slot已经有节点在提供服务,该命令会报错。

    2. CLUSTER DELSLOTS slot1[slot2]...[slotN]:

      在A节点中删除指定的slot(即指定的slot不再由A提供服务)。

    3. CLUSTER SETSLOT slot NODE node:

      将slot指定为由node节点提供服务。

    4. CLUSTER SETSLOT slot MIGRATING node:

      将slot从A节点迁移到指定的节点。注意,slot必须属于A节点,否则会报错。(A迁出)

    5. CLUSTER SETSLOT slot IMPORTING node:

    将slot从指定节点迁移到A节点。 (A迁入)

    执行cluster addslots和cluster delslots之后只会修改A节点的本地视图,之后A节点会通过心跳包将配置同步到集群中其他节点。

    实例

    通过一个实例说明一个slot具体的迁移过程。假设"slot中10000"现在由A提供服务,需要将该slot从A迁移到B。

    cluster setslot 10000 importing A  //在B节点执行

    cluster setslot 10000 migrating B  //在A节点执行

    当客户端请求属于"slot 10000"的key时,仍然会直接向A发送请求(或者通过其他节点通过MOVED重定向到A节点),如果A中找到该key,则直接处理并返回结果。如果A中未找到该key,则返回如下信息:

    GET key

    -ASK 10000 B

    客户端收到该回复后,首先需要向B发送一条asking命令,然后将要执行的命令发送给B。

    生产中使用Redis提供的redis-cli命令来做分片迁移,redis-cli首先在A、B节点执行如上两条命令,然后在A节点执行如下命令:

    cluster getkeysinslot slot count

    这条命令会从节点A的slot,例如10000中取出"count"个key,然后对这些key依次执行迁移命令,如下:

    migrate target_ip target_port key 0 timeout

    target_ip 和 target_port指向B节点,0为数据库ID(集群中的所有节点只能有0号数据库)。

     

    当所有key都迁移完成后,redis-cli会向所有集群中的节点发送如下命令:

    cluster setslot slot node node-id

    其中slot本例中为10000,node-id为B。这个消息会通过心跳包传遍整个集群

    至此,一个slot迁移完毕。当此时再向A节点发送slot为10000的请求时,A节点会直接返回MOVED重定向到B节点。

     

    重定向: MOVED与ASK

    因为在redis集群中16384个槽中的数据是分别存储于不同的数据库中的,而我们的客户端可以向任意节点发送请求,都可以得到回复,这就是我们平时所说的重定向,而这里的重定向有两种情况,即MOVED重定向与ASK重定向,

    1.某个集群节点收到客户端发来的命令后,会判断命令中的key是否由本节点负责,若是,则直接处理命令,若不是,则反馈给客户端MOVED重定向错误,错误中指明了该key真正的负责节点.客户端收到MOVED重定向错误之后.需要重新向真正的负责节点再次发送命令,而这个错误对于客户端来说是透明的,客户端只会把请求成功后的消息返回,并打印Redirected to slot [XXX] located at IP:port.

    2.ASK错误只是两个节点在迁移槽的过程中使用的一种临时措施:客户端收到关于槽位i的ASK错误之后,客户端只会在接下来的一次命令请求中将关于槽位i的命令请求发送至ASK错误所指示的节点,但在这之前会发送一个ASKING命令,它会打开对端中的REDIS_ASKING标识,这个标识在一次请求后会消耗掉.这种重定向不会对客户端今后发送关于槽位i的命令请求产生任何影响,客户端之后仍然会将关于槽位i的命令请求发送至目前负责处理该槽位的节点,除非ASK错误再次出现.

    在redis集群中实际执行命令处理函数之前,需要判断当前节点是否能处理该命令中的key,若本节点不能处理该命令,则回复给客户端重定向错误,处理逻辑在processCommand中.

    集群中一次请求数据涉及多个机器的时候没有办法处理,只会返回错误,如果请求的多个数据在同一个机器上还是可以正常运转的.且在一个ASKING请求中可以请求多个键值对,也就是一个ASKING是在一次命令以后销毁,

    区别:

    MOVED错误代表槽位的负责权已经从一个节点转移到了另一个节点:在客户端收到关于槽位i的MOVED错误之后,会更新槽位i及其负责节点的对应关系,这样下次遇到关于槽位i的命令请求时,就可以直接将命令请求发送新的负责节点.

    ASK错误只是两个节点在迁移槽的过程中使用的一种临时措施:客户端收到关于槽位i的ASK错误之后,客户端只会在接下来的一次命令请求中将关于槽位i的命令请求发送至ASK错误所指示的节点,但这种重定向不会对客户端今后发送关于槽位i的命令请求产生任何影响,客户端之后仍然会将关于槽位i的命令请求发送至目前负责处理该槽位的节点,除非ASK错误再次出现.

    22.2.6 通信数据包类型

    Redis集群中的消息包分为如下10种,最后一种是包类型计数边界,代码中做判断使用。

    #define CLUSTERMSG_TYPE_PING 0 /*ping */

    #define CLUSTERMSG_TYPE_PONG 1 /* pong */

    #define CLUSTERMSG_TYPE_MEET 2 /* meet */

    #define CLUSTERMSG_TYPE_FAIL 3 /*fail*/

    #define CLUSTERMSG_TYPE_PUBLISH 4 /* publish */

    #define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* failover授权请求包 */

    #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* failover授权确认包 */

    #define CLUSTERMSG_TYPE_UPDATE 7 /* update */

    #define CLUSTERMSG_TYPE_MFSTART 8 /* 手动failover包*/

    #define CLUSTERMSG_TYPE_MODULE 9 /* 模块相关包*/

    #define CLUSTERMSG_TYPE_COUNT 10 /* 计数边界 */

    5、6和8三种包只有包头没有包体,剩余所有的包都由包头和包体两部分组成。包头格式相同,包体内容根据具体的类型填充。

    包头格式

    结构体定义

    typedef struct {

    char sig[4]; /* 固定为RCmb(Redis cluster message bus). */

    uint32_t totlen; /* 消息总长度 */

    uint16_t ver; /* 协议版本,当前设值为1 */

    uint16_t port; /* 发送方监听的端口 */

    uint16_t type; /* 包类型 */

    uint16_t count; /* data中的gossip section个数(供ping,pong,meet包使用). */

    uint64_t currentEpoch; /* 发送方节点记录的集群当前纪元. */

    uint64_t configEpoch; /* 发送方节点对应的配置纪元(如果为从,则为该从所对应的主

    服务的配置纪元)*/

    uint64_t offset; /* 如果为主,该值表示复制偏移量;如果为从,该值表示从已处

    理的偏移量 */

    char sender[CLUSTER_NAMELEN]; /* 发送方名称,40字节 */

    unsigned char myslots[CLUSTER_SLOTS/8]; // 发送方提供服务的slot映射表((如果为从,则为该从所对应的主提供服务的slot映射表)

    char slaveof[CLUSTER_NAMELEN]; //发送方如果为从,则该字段为对应的主的名称

    char myip[NET_IP_STR_LEN]; /* 发送方IP*/

    char notused1[34]; /*预留字段. */

    uint16_t cport; /* 发送方监听的cluster bus端口 */

    uint16_t flags; /* 发送方节点的flags */

    unsigned char state; /* 发送方节点所记录的集群状态*/

    unsigned char mflags[3]; /* 目前只有mflags[0]会在手动failover时使用*/

    union clusterMsgData data; //包体内容

    } clusterMsg;

     

    包体格式

    接收到包之后需要根据包头取出type字段,来决定如何解析包体。首先介绍ping包。

    具体的包体内容,其结构体定义为union clusterMsgData,是一个联合体,根据包的类型决定存储什么内容,其定义如下:

    union clusterMsgData {

    struct {

    /* /ping,pong,meet包内容。是个clusterMsgDataGossip类型的数组,根据数组大小使用时确定和分配。该字段称为gossip section*/

    clusterMsgDataGossip gossip[1];

    } ping;

    struct {

    clusterMsgDataFail about; //fail包内容

    } fail;

    struct {

    clusterMsgDataPublish msg; ///publish包内容

    } publish;

    struct {

    clusterMsgDataUpdate nodecfg; //update包内容

    } update;

    struct {

    clusterMsgModule msg; //module包内容

    } module;

    };

    1.ping包格式

    图22-7 ping包格式

    ping包由一个包头和多个gossip section组成。Redis集群中每个节点通过心跳包可以知道其他节点的当前状态并且保存到本节点状态中。

    每个gossip section就是从发送节点的视角出发,所记录的关于其他节点的状态信息,包括节点名称、IP地址、状态以及监听地址,等等。接收方可以据此发现集群中其他的节点或者进行错误发现。

    注意  flags字段标识一个节点的当前状态,例如是Master还是Slave,是否处于pfail或fail状态等。

    2.pong包格式

    格式同ping包,只是将包头中的type字段写为CLUSTERMSG_TYPE_PONG(1)。注意pong包除了在接收到ping包和meet包之后会作为回复包发送之外,当进行主从切换之后,新的主节点会向集群中所有节点直接发送一个pong包,通知主从切换后节点角色的转换。

    3.meet包格式

    格式同ping包,只是将包头中的type字段写为CLUSTERMSG_TYPE_MEET(2)。当执行cluster meet ip port命令之后,执行端会向ip:port指定的地址发送meet包,连接建立之后,会定期发送ping包。

    4.fail包格式

    图22-8 fail包格式

    fail包用来通知集群中某个节点处于故障状态。fail包格式如图22-8所示。

    包体部分只有一个nodename字段,记录被标记为fail状态的节点。当一个节点被大多数节点标记为pfail状态时,会进入fail状态。此时,发现该节点进入fail状态的节点会向集群中所有节点广播一个fail包,通知某个节点已经进入fail状态。当一个主节点进入fail状态后,该主节点的从节点会要求进行切换。

    5.update包格式

    图22-9 update包格式

    update包用来更新集群中节点的配置信息。update包格式如图22-9所示。

    当一个节点A发送了一个ping包给B,声明A节点给slot 1000提供服务,并且ping包中configEpoch为1。接收节点B收到该ping包后,发现B本地记录的slot 1000是由A1提供服务,并且A1的configEpoch为2,大于A节点的configEpoch。此时B会向A节点发送一个update包。

    包体中会记录A1的配置纪元、节点名称及所提供服务的slot,通知A更新自身的信息。

    Update包适用于一种特殊情况:当一个主节点M发生故障之后,其从节点S做了主从切换并且成功升级为主节点,此时S会先将其配置纪元加1,之后将所有M提供服务的slot更新为由S提供服务。之后,当M故障恢复进入集群后就会发生上述情况,此时需要向M发送update包。

    6.failover授权请求包格式

    图22-10 auth request包

    当需要执行主从切换时请求集群中的其他节点向发送该包的节点授权。包格式如图22-10所示。

    授权请求包只有包头,没有包体。当需要执行主从切换时,从节点会向集群中的主节点发送auth request授权请求包。当集群中大部分主节点授权给某个从节点之后,该从节点就可以开始进行主从切换。

    7.failover授权包格式

    格式同failover请求授权包,只是包头中的type字段为CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK(6)。当一个主节点收到请求授权包后,会根据一定的条件决定是否给发送节点发送一个failover授权包,条件如下。

    1)请求授权包中的currentEpoch小于当前节点记录的currentEpoch,则不授权。

    2)已经授权过相同currentEpoch的其他节点,则不再授权。

    3)对同一个主从切换,上次授权时间距离现在小于2倍的nodetimeout(节点超时时间,在配置文件中指定),则不再进行授权。

    4)从所声明的所有slots的configEpoch必须大于等于所有当前节点记录的slot对应的configEpoch,否则不授权。

    当通过上述所有条件后,接收节点才会发送failover授权包给发送节点。

    8.mfstart包格式

    格式同failover请求授权包,只有包头,包头中的type字段为CLUSTERMSG_TYPE_MFSTART(8)。当在一个从节点输入cluster failover命令之后,该从节点会向对应的主节点发送mfstart包,提示主节点开始进行手动切换

    9.publish包格式

    图22-11 publish包格式

    用来广播publish信息。publish包格式如图22-11所示。当向集群中任意一个节点发送publish信息后,该节点会向集群中所有节点广播一条publish包。包中包含有publish信息的渠道信息和消息体信息。

    22.3 本章小结

    集群需要解决的几个问题,

    Redis集群如何解决这些问题。

    Redis集群中如何实现主从切换,

    副本漂移的背景及原理,

    分片迁移的具体思路。

    Redis集群间通信的9种数据包格式。

  • 相关阅读:
    Less34-Less37 (宽字节注入)
    宽字节注入 Less32-Less33
    Less29(jspstudy)-Less31
    Less26-Less28a
    Less23-Less25a(24--二次注入)
    Less 11-22
    sqli-labs 1-10
    sqli-labs环境搭建与安装
    OSPF与ACL综合实例
    用分治法完成比赛操作
  • 原文地址:https://www.cnblogs.com/coloz/p/13812862.html
Copyright © 2011-2022 走看看