zoukankan      html  css  js  c++  java
  • 使用Consul做leader选举的方案

    在分布式集群部署模式下,为了维护数据一致性,通常需要选举出一个leader来进行协调,并且在leader挂掉后能从集群中选举出一个新的leader。选举leader的方案有很多种,对Paxos和Raft协议有过了解的同学应该对leader选举有一些认识,一般都是按照少数服从多数的原则来实现,但是因为分布式环境中无法避免的网络不稳定、数据不同步、时间偏差等问题,要想搞好leader选举并不是一件特别容易的事。这篇文章将提供一个使用Consul做leader选举的简单方案。

    原理

    Consul 的leader选举只有两步:

    1、Create Session:参与选举的应用分别去创建Session,Session的存活状态关联到健康检查。

    2、Acquire KV:多个应用带着创建好的Session去锁定同一个KV,只能有一个应用锁定住,锁定成功的应用就是leader。

    如上图所示,这里假设App2用Session锁定住了KV,其实就是KV的Session属性设置为了Session2。

    什么时候会触发重新选举呢?

    • Session失效:Session被删除、Session关联的健康检查失败、Session TTL过期等。
    • KV被删除:这个没什么需要特别说明的。

    那应用们怎么感知这些情况呢?

    应用在选举结束后,应该保持一个到KV的阻塞查询,这个查询会在超时或者KV发生变化的时候返回结果,这时候应用可以根据返回结果判断是否发起新的选举。

    示例

    这里给出一个Java的例子:这是一个控制台程序,程序会创建一个Session,然后尝试使用这个Session锁定key为“program/leader”的Consul KV,同时也会尝试设置KV的值为当前节点Id“007”。不管捕获成功还是失败,程序随后都会启动一个针对“program/leader”的阻塞查询,在阻塞查询返回时会判断KV是否存在或者绑定的Session是否存在,如果有任何一个不存在,则发起选举,否则继续阻塞查询。这个“阻塞查询->选举”的操作是一个无限循环操作。

    package cn.bossma;
    
    import com.ecwid.consul.v1.ConsulClient;
    import com.ecwid.consul.v1.QueryParams;
    import com.ecwid.consul.v1.kv.model.GetValue;
    import com.ecwid.consul.v1.kv.model.PutParams;
    import com.ecwid.consul.v1.session.model.NewSession;
    import com.ecwid.consul.v1.session.model.Session;
    import org.apache.commons.lang3.StringUtils;
    
    /**
     * consul leader 选举演示程序
     *
     * @author: bossma.cn
     */
    public class Main {
    
        private static ConsulClient client = new ConsulClient();
        private static String sesssionId = "";
        private static String nodeId = "007";
        private static String electName = "program/leader";
    
        /**
         * @param args
         */
        public static void main(String[] args) {
            System.out.println("starting");
            watch();
        }
    
        /**
         * 监控选举
         *
         * @param:
         * @return:
         * @author: bossma.cn
         */
        private static void watch() {
    
            System.out.println("start first leader election");
    
            // 上来就先选举一次,看看结果
            ElectResponse electResponse = elect();
            System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());
    
            long waitIndex = electResponse.modifyIndex++;
            int waitTime = 30;
    
            do {
                try {
                    System.out.println("start leader watch query");
    
                    // 阻塞查询
                    GetValue kv = getKVValue(electName, waitTime, waitIndex);
    
                    // kv被删除或者kv绑定的session不存在
                    if (null == kv || StringUtils.isEmpty(kv.getSession())) {
                        System.out.println("leader missing, start election right away");
                        electResponse = elect();
                        waitIndex = electResponse.modifyIndex++;
                        System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());
                    } else {
                        long kvModifyIndex = kv.getModifyIndex();
                        waitIndex = kvModifyIndex++;
                    }
                } catch (Exception ex) {
                    System.out.print("leader watch异常:" + ex.getMessage());
    
                    try {
                        Thread.sleep(3000);
                    } catch (Exception ex2) {
                        System.out.printf(ex2.getMessage());
                    }
                }
            }
            while (true);
        }
    
        /**
         * 执行选举
         *
         * @param:
         * @return:
         * @author: bossma.cn
         */
        private static ElectResponse elect() {
    
            ElectResponse response = new ElectResponse();
    
            Boolean electResult = false;
    
            // 创建一个关联到当前节点的Session
            if (StringUtils.isNotEmpty(sesssionId)) {
                Session s = getSession(sesssionId);
                if (null == s) {
                    // 这里session关联的健康检查只绑定了consul节点的健康检查
                    // 实际使用的时候建议把当前应用程序的健康检查也绑定上,否则如果只是程序关掉了,session也不会失效
                    sesssionId = createSession(10);
                }
            } else {
                sesssionId = createSession(10);
            }
    
            // 获取选举要锁定的Consul KV对象
            GetValue kv = getKVValue(electName);
            if (null == kv) {
                kv = new GetValue();
            }
    
            // 谁先捕获到KV,谁就是leader
            // 注意:如果程序关闭后很快启动,session关联的健康检查可能不会失败,所以session不会失效
            // 这时候可以把程序创建的sessionId保存起来,重启后首先尝试用上次的sessionId,
            electResult = acquireKV(electName, nodeId, sesssionId);
    
            // 无论参选成功与否,获取当前的Leader
            kv = getKVValue(electName);
            response.setElectResult(electResult);
            response.setLeaderId(kv.getDecodedValue());
            response.setModifyIndex(kv.getModifyIndex());
            return response;
        }
    
        /**
         * 创建Session
         *
         * @param: lockDealy session从kv释放后,kv再次绑定session的延迟时间
         * @return:
         * @author: bossma.cn
         */
        private static String createSession(int lockDelay) {
            NewSession session = new NewSession();
            session.setLockDelay(lockDelay);
            return client.sessionCreate(session, QueryParams.DEFAULT).getValue();
        }
    
        /**
         * 获取指定的session信息
         *
         * @param: sessionId
         * @return: Session对象
         * @author: bossma.cn
         */
        private static Session getSession(String sessionId) {
            return client.getSessionInfo(sessionId, QueryParams.DEFAULT).getValue();
        }
    
    
        /**
         * 使用Session捕获KV
         *
         * @param key
         * @param value
         * @param sessionId
         * @return
         * @author: bossma.cn
         */
        public static Boolean acquireKV(String key, String value, String sessionId) {
            PutParams putParams = new PutParams();
            putParams.setAcquireSession(sessionId);
    
            return client.setKVValue(key, value, putParams).getValue();
        }
    
        /**
         * 获取指定key对应的值
         *
         * @param: key
         * @return:
         * @author: bossma.cn
         */
        private static GetValue getKVValue(String key) {
            return client.getKVValue(key).getValue();
        }
    
        /**
         * block获取指定key对应的值
         *
         * @param: key, waitTime, waitIndex
         * @return:
         * @author: bossma.cn
         */
        private static GetValue getKVValue(String key, int waitTime, long waitIndex) {
            QueryParams paras = QueryParams.Builder.builder()
                    .setWaitTime(waitTime)
                    .setIndex(waitIndex)
                    .build();
            return client.getKVValue(key, paras).getValue();
        }
    
        /**
         * leader选举结果
         *
         * @author: bossma.cn
         */
        private static class ElectResponse {
    
            private Boolean electResult = false;
            private long modifyIndex = 0;
            private String leaderId;
    
            public String getLeaderId() {
                return leaderId;
            }
    
            public void setLeaderId(String leaderId) {
                this.leaderId = leaderId;
            }
    
            public Boolean getElectResult() {
                return electResult;
            }
    
            public void setElectResult(Boolean electResult) {
                this.electResult = electResult;
            }
    
            public long getModifyIndex() {
                return modifyIndex;
            }
    
            public void setModifyIndex(long modifyIndex) {
                this.modifyIndex = modifyIndex;
            }
        }
    }

    1、用于选举的Consul KV必须使用锁定的session进行更新,如果通过其它方式更新,KV绑定的Session不会有影响,也就是说KV还是被原来的程序锁定,但是却被其它的程序修改了,这不符合leader的规则。

    2、Session 关联的健康检查默认只有当前节点的健康检查,如果应用程序停止,Session并不会失效,所以建议将Session 关联的健康检查包含应用的健康检查;但是如果只有应用的健康检查,服务器停止,应用的健康检查仍可能是健康的,所以Session的健康检查应该把应用程序和Consul 节点的健康检查都纳入进来。

    3、如果程序关闭后很快启动,session关联的健康检查可能不会失败,所以session不会失效,程序启动后如果创建一个新的Session去锁定KV,就不能成功锁定KV,这时候建议将SessionId持久化存储,如果Session还存在,就还是用这个Session 去锁定。

    4、lockdelay:这不是一个坑,是一个保护机制,但需要考虑好用不用。它可以在session失效后短暂的不允许其它session来锁定KV,这是因为此时应用程序可能还是正常的,session关联的健康检查误报了,应用程序可能还在处理业务,需要一段时间来结束处理。也可以使用0值把这个机制禁止掉。

    5、可能leader加载的东西比较多,leader切换比较麻烦,考虑到session关联的健康检查误报的问题,希望leader选举优先上次锁定KV的程序,这样可以提高效率。此时可以在选举程序中增加一些逻辑:如果选举的时候发现上次的leader是当前程序,则立即选举;如果发现上次的leader不是当前程序,则等待两个固定的时间周期再提交选举。

    整体上看,Consul提供的Leader选举方案还是比较简单的,无论是集群部署中的leader选举,还是传统主备部署,都可以适用。其中的关键是Session,一定要结合自己的业务考虑周全。最后欢迎加入800人Consul交流群234939415,一起探讨使用Consul的各种场景和问题。

  • 相关阅读:
    topcoder srm 320 div1
    topcoder srm 325 div1
    topcoder srm 330 div1
    topcoder srm 335 div1
    topcoder srm 340 div1
    topcoder srm 300 div1
    topcoder srm 305 div1
    topcoder srm 310 div1
    topcoder srm 315 div1
    如何统计iOS产品不同渠道的下载量?
  • 原文地址:https://www.cnblogs.com/bossma/p/11576288.html
Copyright © 2011-2022 走看看