zoukankan      html  css  js  c++  java
  • zookeeper 核心原理

    zookeeper 核心原理

    1、了解zookeeper的设计

    2、zookeeper集群角色

    3、深入分析ZAB协议

    4、从源码层面分析leader选举的实现过程

    5、关于zookeeper的数据存储

    6、关于zookeeper数据存储

    zookeeper 的由来

    zookeeper的设计

    1. 防止单点故障

      集群方案(leader、follower)、还能分担请求

    2. 每个节点的数据是一致的

      leader、master、redis-cluster

    3. leader挂了怎么办?数据如何恢复

      选举机制

    4. 如何去保证数据一致性?(分布式事务)

      2PC

    结论:

    1. zab来实现选举: 集群内选举leader来调度简化集群的复杂度,

    2. 为什么要做集群: 保证zookeeper协调工具的高性能和高可用(热备,同步)

    3. 2pc做数据一致性: 引入了协调者(leader)和参与者(follower)的概念。

    zookeeper 集群

    1. follower:处理读请求,转发写请求给leader
    2. leader接收到事务请求后会转发提议给集群中的每一个节点(observer除外)
    3. follwer节点收到提议后响应,返回ack
    4. leader收到过半节点响应ack,便会提交事务(commit),给客户端一个response。反之会执行回滚。
    5. 事务提交后会同步给Observer

    3种角色特性:

    •  leader:集群的核心,起到了主导整个集群的作用,事务请求的调度和处理。
    •  follower:处理客户端的非事务请求,转发事务请求,参与事务的投票过程,参与leader选举投票
    •  observer:观察者角色,了解集群中的状态变化,进行状态同步。可以响应非事务请求。

    备注:observer与follwer工作原理一致,区别是不参与事务请求的投票,投票会影响性能。
    当引入更多节点提升性能时候,多投票,多网络请求,但observer可以在不投票不增加网络请求的情况下提升性能,所以引入了observer。

    Zookeeper集群中服务器数量的增加,会影响集群中写数据的性能,因为集群中是使用2PC协议,索引当更新节点的时候,需要半数已经的机器的ack才会执行commit操作。机器的增加,势必会增加收集ack的时间。Observer在不影响集群中事务处理能力的前提下,扩展Zookeeper提高集群中的非事务的处理能力。

    •  1、观察zk集群的服务器的状态,并将状态同步到observer服务器上。
    •  2、处理客户端的非事务请求,转发事务请求给leader
    •  3、不参与任何投票(与follow的区别)

    节点数:2n+1节点,至少n+1个可用,满足投票机制过半机制的需要,所以是最少三个,奇数节点。

    server.1=192.168.1.103:2888:3181
    server.2=192.168.1.104:2888:3181
    server.3=192.168.1.106:2888:3181
    server.4=192.168.1.102:2888:3181

    ZAB 协议

    ZAB(zookeeper atomic Broadcast)协议是为分布式协调服务zookeeper专门设计的一种支持崩溃恢复的原子广播协议,主要用于实现分布式数据一致性,通过主备模式的系统架构来保持集群中各个副本之间的数据一致性。

    支持崩溃恢复的原子广播协议、主要用于实现数据一致性,

    ZAB协议的两个基本模式,也是zab核心:

    • 崩溃恢复
    • 原子广播

    注意:
    投票是所有节点参与的,leader自己也不例外
    但所有投票过程不需要observer ,但observer必须要和leader节点保持数据同步,保证正确的处理非事务请求。

    消息广播

    改进版2PC

    崩溃恢复(对数据层来说)

    1、当leader失去了过半的follower节点的联系

    2、当leader服务挂了

    ​ 集群就会进入崩溃恢复阶段对于数据恢复来说

    1、已经被处理的消息不能丢失

    当leader收到合法数量的follower的ack以后,就会向各个follower广播消息(commit命令),同时自己也会commit这条事务消息,如果follower节点收到commit命令之前。leader挂了,会导致部分节点收到commit,部分节点没有收到,那么**zab协议需要保证已经被处理的消息不能丢失。**

     

    2、被丢弃的消息不能再次出现

    当leader收到事务请求,并且还未发起事务投票之前,leader挂了;怎么办?

    旧的leader带领的上个朝代没有提交的事务会被全部丢弃。
    **此时zab协议要保证被丢弃的消息不能再出现。**

    **zab 的设计思想**

    为了满足上面的两个原则,zab做了如下的设计:

    1. zxid(消息id)是最大的。(新选举的leader的zxid是最大的,保证当前节点的消息是最新的)。
    比如leader挂了之后,follwer1收到了commit请求,follwer1的zxid就是最新的,最大的,follwer2没有收到commit请求,zxid不是最大的,选举时候依旧选举zxid是最大的那个节点作为leader,follwer1的提交之前的commit请求可以保证数据时最新的,不丢失,由此满足了上面的第一条原则。
    2. epoch的概念,每产生一个leader,那么新的leader的epoch会+1,zxid是64位的数据,低32位表示消息计数器(自增),高32位(存储epoch编号)。tips:epoch概念可以联想各个朝代皇帝的年号。
    tips:
    新选举的leader的epoch会比上一轮leader的epoch高,这样保证上一轮leader再起来之后本一轮不会被选举成为leader,而变成了一个follwer,而且旧的leader的zxid会小于新leader的zxid,新的leader继任之后会把旧的leader所有没提交的事务清除,由此满足了上面的第二条原则。

    疑问如下:

    2. 临时节点使用场景:分布式锁;既然有持久化节点;
    3. 为什么需要有临时节点存在; 提升集群性能、客户端断开连接临时节点会自动删除,减少网络开销

    leader选举

    基于fastleader选举:

    1、选举指标

    • zxid 最大设置为leader 64位,

    • myid(服务器id,sid)【myid越大,在leader选举机制中权重越大】

    2、选举阶段

    • 启动时

    • 运行时崩溃后

    epoch (每一轮投票,epoch都会递增)

     

    选举状态

    graph LR
    LOKING[LOKING] --> LEADING[LEADING] --> FOLLOWING[FOLLOWING] --> OBSERVING[OBSERVING]

    启动的时候初始化

    (myid,zxid,epoch)

    1. 检查zxid

    2. myid

    3. 统计投票

      1. 判断epoch

      2. zxid

      3. 再判断myid

    QuorumPeer

    Leader选举源码分析

    看QuormPeerMain的 main.initializeAndRun(args)

     

     1 protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
     2     // 用来保存全局配置
     3     QuorumPeerConfig config = new QuorumPeerConfig();
     4     if (args.length == 1) {
     5         // args[0] -> zoo.cfg, 解析配置文件并保存到 QuorumPeerConfig
     6         config.parse(args[0]);
     7     }
     8     // Start and schedule the the purge task
     9     // 启动一个定时任务清理日志
    10     DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
    11         config.getDataDir(),
    12         config.getDataLogDir(),
    13         config.getSnapRetainCount(),
    14         config.getPurgeInterval());
    15     purgeMgr.start();
    16     // 判断是否standalone模式,或是集群
    17     if (args.length == 1 && config.isDistributed()) {
    18         runFromConfig(config);
    19     } else {
    20         LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
    21         // there is only server in the quorum -- run as standalone
    22         ZooKeeperServerMain.main(args);
    23     }
    24 }

    看runFromConfig 是怎么处理的

     1 protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
     2     // 用来保存全局配置
     3     QuorumPeerConfig config = new QuorumPeerConfig();
     4     if (args.length == 1) {
     5         // args[0] -> zoo.cfg, 解析配置文件并保存到 QuorumPeerConfig
     6         config.parse(args[0]);
     7     }
     8     // Start and schedule the the purge task
     9     // 启动一个定时任务清理日志
    10     DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
    11         config.getDataDir(),
    12         config.getDataLogDir(),
    13         config.getSnapRetainCount(),
    14         config.getPurgeInterval());
    15     purgeMgr.start();
    16     // 判断是否standalone模式,或是集群
    17     if (args.length == 1 && config.isDistributed()) {
    18         runFromConfig(config);
    19     } else {
    20         LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
    21         // there is only server in the quorum -- run as standalone
    22         ZooKeeperServerMain.main(args);
    23     }
    24 }

    看runFromConfig 是怎么处理的

      1 public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
      2     try {
      3         ManagedUtil.registerLog4jMBeans();
      4     } catch (JMException e) {
      5         LOG.warn("Unable to register log4j JMX control", e);
      6     }
      7 
      8     LOG.info("Starting quorum peer, myid=" + config.getServerId());
      9     MetricsProvider metricsProvider; // 指标数据
     10     try {
     11         metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
     12             config.getMetricsProviderClassName(),
     13             config.getMetricsProviderConfiguration());
     14     } catch (MetricsProviderLifeCycleException error) {
     15         throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
     16     }
     17     try {
     18         ServerMetrics.metricsProviderInitialized(metricsProvider);
     19         // 这个和2181端口监听有关系
     20         // ClientCnxn(客户端和服务端进行网络交互的类)
     21         // ServerCnxn(服务端网络通信处理类)
     22         ProviderRegistry.initialize();
     23         ServerCnxnFactory cnxnFactory = null;
     24         ServerCnxnFactory secureCnxnFactory = null;
     25         // 为客户端提供读写的server,也就是2181这个端口的访问功能
     26         if (config.getClientPortAddress() != null) {
     27             cnxnFactory = ServerCnxnFactory.createFactory();
     28             cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
     29         }
     30 
     31         if (config.getSecureClientPortAddress() != null) {
     32             secureCnxnFactory = ServerCnxnFactory.createFactory();
     33             secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
     34         }
     35         // zk逻辑主线程、负责选举、投票
     36         quorumPeer = getQuorumPeer();
     37         quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
     38         quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
     39         quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
     40         //quorumPeer.setQuorumPeers(config.getAllMembers());
     41         quorumPeer.setElectionType(config.getElectionAlg());            // 采用什么选举算法
     42         quorumPeer.setMyid(config.getServerId());                       // myId(sid,myid)
     43         quorumPeer.setTickTime(config.getTickTime());                   // 心跳时间间隔(2000)
     44         quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
     45         quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
     46         quorumPeer.setInitLimit(config.getInitLimit());                 // 数据初始化的时长
     47         quorumPeer.setSyncLimit(config.getSyncLimit());                 // 数据同步时长
     48         quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
     49         quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
     50         quorumPeer.setConfigFileName(config.getConfigFilename());
     51         quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
     52         quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
     53         quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
     54         if (config.getLastSeenQuorumVerifier() != null) {
     55             quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
     56         }
     57         quorumPeer.initConfigInZKDatabase();                 // 初始化内存数据库 ->磁盘持久化
     58         quorumPeer.setCnxnFactory(cnxnFactory);
     59         quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
     60         quorumPeer.setSslQuorum(config.isSslQuorum());
     61         quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
     62         quorumPeer.setLearnerType(config.getPeerType());
     63         quorumPeer.setSyncEnabled(config.getSyncEnabled());
     64         quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
     65         if (config.sslQuorumReloadCertFiles) {
     66             quorumPeer.getX509Util().enableCertFileReloading();
     67         }
     68         quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled());
     69         quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled());
     70         quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs());
     71 
     72         // sets quorum sasl authentication configurations
     73         quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
     74         if (quorumPeer.isQuorumSaslAuthEnabled()) {
     75             quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
     76             quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
     77             quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
     78             quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
     79             quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
     80         }
     81         quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
     82         quorumPeer.initialize();
     83 
     84         if (config.jvmPauseMonitorToRun) {
     85             quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
     86         }
     87         // 启动主线程
     88         quorumPeer.start();
     89         ZKAuditProvider.addZKStartStopAuditLog();
     90         quorumPeer.join();
     91     } catch (InterruptedException e) {
     92         // warn, but generally this is ok
     93         LOG.warn("Quorum Peer interrupted", e);
     94     } finally {
     95         if (metricsProvider != null) {
     96             try {
     97                 metricsProvider.stop();
     98             } catch (Throwable error) {
     99                 LOG.warn("Error while stopping metrics", error);
    100             }
    101         }
    102     }
    103 }

    启动主线程,QuorumPeer 重写了 Thread.start 方法,我们接下来看start方法里面发生了什么?
    调用 QUORUMPEER 的 START 方法

     1 @Override
     2 public synchronized void start() {     // 重写线程start方法
     3     if (!getView().containsKey(myid)) {
     4         throw new RuntimeException("My id " + myid + " not in the peer list");
     5     }
     6     loadDataBase();             // 从磁盘加载数据
     7     startServerCnxnFactory();   // 这里来启动2181端口监听,ServerSocketChannel
     8     try {
     9         adminServer.start();
    10     } catch (AdminServerException e) {
    11         LOG.warn("Problem starting AdminServer", e);
    12         System.out.println(e);
    13     }
    14     startLeaderElection();      // 开启leader选举
    15     startJvmPauseMonitor();     // 启动监控
    16     super.start();              // 启动线程
    17 }

    loaddatabase, 主要是从本地文件中恢复数据,以及获取最新的 zxid

     1 private void loadDataBase() {
     2     try {
     3         zkDb.loadDataBase();    // 从本地文件加载数据
     4 
     5         // load the epochs
     6         // 从最新的zxid恢复epoch变量、zxid64位,前32位是epoch的值,后32位是zxid
     7         long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
     8         long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
     9         try {
    10             // 从文件中读取当前的epoch
    11             currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
    12         } catch (FileNotFoundException e) {
    13             // pick a reasonable epoch number
    14             // this should only happen once when moving to a
    15             // new code version
    16             currentEpoch = epochOfZxid;
    17             LOG.info(
    18                 "{} not found! Creating with a reasonable default of {}. "
    19                     + "This should only happen when you are upgrading your installation",
    20                 CURRENT_EPOCH_FILENAME,
    21                 currentEpoch);
    22             writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
    23         }
    24         if (epochOfZxid > currentEpoch) {
    25             throw new IOException("The current epoch, "
    26                                   + ZxidUtils.zxidToString(currentEpoch)
    27                                   + ", is older than the last zxid, "
    28                                   + lastProcessedZxid);
    29         }
    30         try {
    31             //从文件中读取接收的epoch
    32             acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
    33         } catch (FileNotFoundException e) {
    34             // pick a reasonable epoch number
    35             // this should only happen once when moving to a
    36             // new code version
    37             acceptedEpoch = epochOfZxid;
    38             LOG.info(
    39                 "{} not found! Creating with a reasonable default of {}. "
    40                     + "This should only happen when you are upgrading your installation",
    41                 ACCEPTED_EPOCH_FILENAME,
    42                 acceptedEpoch);
    43             writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
    44         }
    45         if (acceptedEpoch < currentEpoch) {
    46             throw new IOException("The accepted epoch, "
    47                                   + ZxidUtils.zxidToString(acceptedEpoch)
    48                                   + " is less than the current epoch, "
    49                                   + ZxidUtils.zxidToString(currentEpoch));
    50         }
    51     } catch (IOException ie) {
    52         LOG.error("Unable to load database on disk", ie);
    53         throw new RuntimeException("Unable to run quorum server ", ie);
    54     }
    55 }

    退出loaddatabase,我们看初始化 LEADERELECTION

     1 public synchronized void startLeaderElection() {
     2     try {
     3         // 得到当前节点的状态,如果是 LOOKING
     4         if (getPeerState() == ServerState.LOOKING) {
     5             // 构建一个Vote(myid、zxid、epoch)
     6             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
     7         }
     8     } catch (IOException e) {
     9         RuntimeException re = new RuntimeException(e.getMessage());
    10         re.setStackTrace(e.getStackTrace());
    11         throw re;
    12     }
    13     // 根据electionType来创建选举算法
    14     this.electionAlg = createElectionAlgorithm(electionType);
    15 }

    配置选举算法,选举算法有 3 种,可以通过在 zoo.cfg 里面进行配置,默认是 fast 选举

     1 protected Election createElectionAlgorithm(int electionAlgorithm) {
     2         Election le = null;
     3 
     4         //TODO: use a factory rather than a switch
     5         switch (electionAlgorithm) {
     6         case 1:
     7             throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
     8         case 2:
     9             throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    10         case 3:
    11             // cnxn(和网络有关的一个类,ServerCnxn、ClientCnxn)
    12             // QuorumCnxManager 管理集群选举和投票相关的操作
    13             QuorumCnxManager qcm = createCnxnManager();
    14             QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
    15             if (oldQcm != null) {   // 判断是否已经开启选举
    16                 LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
    17                 oldQcm.halt();      // 种植掉当前的选举
    18             }
    19             // 监听集群中的票据
    20             QuorumCnxManager.Listener listener = qcm.listener;
    21             if (listener != null) {
    22                 listener.start();
    23                 // 初始化了FastLeaderElection
    24                 FastLeaderElection fle = new FastLeaderElection(this, qcm);
    25                 fle.start();    // 启动leader选举
    26                 le = fle;
    27             } else {
    28                 LOG.error("Null listener when initializing cnx manager");
    29             }
    30             break;
    31         default:
    32             assert false;
    33         }
    34         return le;
    35     }

    启动leader选举

    1 void start() {
    2     /**
    3              * 启动两个线程
    4              * wsThread 业务层发送线程,将消息发送给IO负责类 QuorumCnxManger
    5              * 启动业务层接受线程,从IO负责类 QuorumCnxManger 接收消息
    6              */
    7     this.wsThread.start();
    8     this.wrThread.start();
    9 }

    投票过程

    1、检查节点状态是looking时投票给自己、投票逻辑源码在FastLeaderElection类的lookforleader(),可以自己再细看源码

    • 各个节点互相广播vode信息(myid,zxid,epoch)

    • 先判断epoch,再判断zxid,再判断myid

    • 胜出的投票会更新到当前的结果中。

    • 继续广播,让其他节点知道自己现在的票据(告诉别人胜出的那个票据信息)。

    • epoch更新,进行下一轮选举

    • 如果收到的消息epoch小于当前节点的epoch,则忽略这条消息(忽略旧的投票参数)

    • epoch相同时比较zxid,myid,如果胜出就更新自己的票据,并发出广播

    • 投票的结果都蠢到本机的投票集合中,用来判断是不是超过半数

    // 1、判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
    // 2、如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
    // 3、如果前面两个都相等那就比较服务器id,如果大,则其就是leader
    return ((newEpoch > curEpoch)
    || ((newEpoch == curEpoch)
    && ((newZxid > curZxid)
    || ((newZxid == curZxid)
    && (newId > curId)))));

  • 相关阅读:
    lambda续集——1
    c++之—— lambda表达式(有个未能解决的问题等待大佬解答)——(在stack overflow找到了答案)
    交换两个变量,只使用2个变量——权当面试了解使用
    移位实现正负数原码输出
    算法导论之——插入排序
    类模板的实现与定义相分离
    类模板
    当函数模板遇到普通函数
    c++之——template模板函数
    字符转数字,数字转字符
  • 原文地址:https://www.cnblogs.com/lwx57280/p/14319406.html
Copyright © 2011-2022 走看看