zoukankan      html  css  js  c++  java
  • zookeeper master 选举

    原文地址:

    http://www.cnblogs.com/nevermorewang/p/5611807.html

    选主原理介绍:zookeeper的节点有两种类型,持久节点跟临时节点。临时节点有个特性,就是如果注册这个节点的机器失去连接(通常是宕机),那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,作为从机存在。同时,抢主失败的当前服务器需要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操作。从机具体需要去哪里注册服务器列表的临时节点,节点保存什么信息,根据具体的业务不同自行约定。选主的过程,其实就是简单的争抢在zookeeper注册临时节点的操作,谁注册了约定的临时节点,谁就是master。

    主要有两个类,WorkServer为主服务类,RunningData用于记录运行数据。因为是简单的demo,我们只做抢master节点的编码,对于从节点应该去哪里注册服务列表信息,不作编码。

      采用zkClient实现,代码如下:

      WorkServer类

    package com.zookeeper.master;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkInterruptedException;
    import org.I0Itec.zkclient.exception.ZkNoNodeException;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    import org.apache.zookeeper.CreateMode;
    
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by nevermore on 16/6/22.
     */
    public class WorkServer {
    
        //客户端状态
        private volatile boolean running = false;
    
        private ZkClient zkClient;
    
        //zk主节点路径
        public static final String MASTER_PATH = "/master";
    
        //监听(用于监听主节点删除事件)
        private IZkDataListener dataListener;
    
        //服务器基本信息
        private RunningData serverData;
        //主节点基本信息
        private RunningData masterData;
    
        //调度器
        private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
        //延迟时间5s
        private int delayTime = 5;
    
    
    
        public WorkServer(RunningData runningData){
            this.serverData = runningData;
            this.dataListener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
    
                }
    
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    //takeMaster();
    
                    if(masterData != null && masterData.getName().equals(serverData.getName())){
                        //若之前master为本机,则立即抢主,否则延迟5秒抢主(防止小故障引起的抢主可能导致的网络数据风暴)
                        takeMaster();
                    }else{
                        delayExector.schedule(new Runnable() {
                            @Override
                            public void run() {
                                takeMaster();
                            }
                        },delayTime, TimeUnit.SECONDS);
                    }
    
                }
            };
        }
    
        //启动
        public void start() throws Exception{
            if(running){
                throw new Exception("server has startup....");
            }
            running = true;
            zkClient.subscribeDataChanges(MASTER_PATH,dataListener);
            takeMaster();
        }
    
        //停止
        public void stop() throws Exception{
            if(!running){
                throw new Exception("server has stopped.....");
            }
            running = false;
            delayExector.shutdown();
            zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
            releaseMaster();
        }
    
        //抢注主节点
        private void takeMaster(){
            if(!running) return ;
    
            try {
                zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
                masterData = serverData;
                System.out.println(serverData.getName()+" is master");
    
                delayExector.schedule(new Runnable() {//测试抢主用,每5s释放一次主节点
                    @Override
                    public void run() {
                        if(checkMaster()){
                            releaseMaster();
                        }
                    }
                },5,TimeUnit.SECONDS);
    
    
            }catch (ZkNodeExistsException e){//节点已存在
                RunningData runningData = zkClient.readData(MASTER_PATH,true);
                if(runningData == null){//读取主节点时,主节点被释放
                    takeMaster();
                }else{
                    masterData = runningData;
                }
            } catch (Exception e) {
                // ignore;
            }
    
        }
        //释放主节点
        private void releaseMaster(){
            if(checkMaster()){
                zkClient.delete(MASTER_PATH);
            }
        }
        //检验自己是否是主节点
        private boolean checkMaster(){
            try {
                RunningData runningData = zkClient.readData(MASTER_PATH);
                masterData = runningData;
                if (masterData.getName().equals(serverData.getName())) {
                    return true;
                }
                return false;
    
            }catch (ZkNoNodeException e){//节点不存在
                return  false;
            }catch (ZkInterruptedException e){//网络中断
                return checkMaster();
            }catch (Exception e){//其它
                return false;
            }
        }
    
        public void setZkClient(ZkClient zkClient) {
            this.zkClient = zkClient;
        }
    
        public ZkClient getZkClient() {
            return zkClient;
        }
    }
    View Code

    RunningData类:

    package com.zookeeper.master;
    
    import java.io.Serializable;
    
    /**
     * Created by nevermore on 16/6/22.
     */
    public class RunningData implements Serializable {
    
        private static final long serialVersionUID = 4260577459043203630L;
    
    
        //服务器id
        private long cid;
        //服务器名称
        private String name;
    
    
        public long getCid() {
            return cid;
        }
    
        public void setCid(long cid) {
            this.cid = cid;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    View Code

    说明:在实际生产环境中,可能会由于插拔网线等导致网络短时的不稳定,也就是网络抖动。由于正式生产环境中可能server在zk上注册的信息是比较多的,而且server的数量也是比较多的,那么每一次切换主机,每台server要同步的数据量(比如要获取谁是master,当前有哪些salve等信息,具体视业务不同而定)也是比较大的。那么我们希望,这种短时间的网络抖动最好不要影响我们的系统稳定,也就是最好选出来的master还是原来的机器,那么就可以避免发现master更换后,各个salve因为要同步数据等导致的zk数据网络风暴。所以在WorkServer中,54-63行,我们抢主的时候,如果之前主机是本机,则立即抢主,否则延迟5s抢主。这样就给原来主机预留出一定时间让其在新一轮选主中占据优势,从而利于环境稳定。

    测试代码:

    package com.zookeeper.master;
    
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.SerializableSerializer;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Created by nevermore on 16/6/23.
     */
    public class LeaderSelectorZkClient {
    
        //启动的服务个数
        private static final int        CLIENT_QTY = 10;
        //zookeeper服务器的地址
        private static final String     ZOOKEEPER_SERVER = "localhost:2181";
    
    
        public static void main(String[] args) throws Exception{
            //保存所有zkClient的列表
            List<ZkClient> clients = new ArrayList<ZkClient>();
            //保存所有服务的列表
            List<WorkServer>  workServers = new ArrayList<WorkServer>();
    
            try{
                for ( int i = 0; i < CLIENT_QTY; ++i ){
                    //创建zkClient
                    ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                    clients.add(client);
                    //创建serverData
                    RunningData runningData = new RunningData();
                    runningData.setCid(Long.valueOf(i));
                    runningData.setName("Client #" + i);
                    //创建服务
                    WorkServer  workServer = new WorkServer(runningData);
                    workServer.setZkClient(client);
    
                    workServers.add(workServer);
                    workServer.start();
                }
    
                System.out.println("敲回车键退出!
    ");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
            }finally{
                System.out.println("Shutting down...");
    
                for ( WorkServer workServer : workServers ){
                    try {
                        workServer.stop();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                for ( ZkClient client : clients ){
                    try {
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code
  • 相关阅读:
    重大技术需求系统八
    2020年下半年软考真题及答案解析
    周总结五
    重大技术需求系统七
    TextWatcher 编辑框监听器
    Android四大基本组件介绍与生命周期
    JAVA String,StringBuffer与StringBuilder的区别??
    iOS开发:保持程序在后台长时间运行
    宏定义的布局约束
    随便说一些
  • 原文地址:https://www.cnblogs.com/newlangwen/p/10143987.html
Copyright © 2011-2022 走看看