zoukankan      html  css  js  c++  java
  • zookeeper学习【4】master选举

    考虑7*24小时向外提供服务的系统,不能有单点故障,于是我们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提 供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程中,从备机选出一台机作为主机的过程,就是Master选 举。

    架构图:

    左边是ZooKeeper集群,右边是3台工作服务器。工作服务器启动时,会去ZooKeeper的Servers节点下创建临时节点,并把基本信息写入 临时节点。这个过程叫服务注册,系统中的其他服务可以通过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫做服务发现。接 着这些服务器会尝试创建Master临时节点,谁创建成功谁就是Master,其他的两台就作为Slave。所有的Work Server必需关注Master节点的删除事件。通过监听Master节点的删除事件,来了解Master服务器是否宕机(创建临时节点的服务器一旦宕 机,它所创建的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。

    实现代码:

    /**
     * 调度器
     */
    public class LeaderSelectorZkClient {
    
        //启动的服务个数
        private static final int        CLIENT_QTY = 10;
        //zookeeper服务器的地址
        private static final String     ZOOKEEPER_SERVER = "192.168.1.105:2181";
    
    
        public static void main(String[] args) throws Exception {
            //保存所有zkClient的列表
            List<ZkClient> clients = new ArrayList<ZkClient>();
            //保存所有服务的列表
            List<WorkServer>  workServers = new ArrayList<WorkServer>();
    
            try {
                for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模拟创建10个服务器并启动
                    //创建zkClient
                    ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                    clients.add(client);
                    //创建serverData
                    RunningData runningData = new RunningData();
                    runningData.setCid(Long.valueOf(i));
                    runningData.setName("Client #" + i);
                    //创建服务
                    WorkServer  workServer = new WorkServer(runningData);
                    workServer.setZkClient(client);
    
                    workServers.add(workServer);
                    workServer.start();
                }
    
                System.out.println("敲回车键退出!
    ");
                new BufferedReader(new InputStreamReader(System.in)).readLine();
    
            } finally {
    
                System.out.println("Shutting down...");
    
                for ( WorkServer workServer : workServers ) {
                    try {
                        workServer.stop();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                for ( ZkClient client : clients ) {
                    try {
                        client.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    import java.io.Serializable;
    
    /**
     * 工作服务器信息
     */
    public class RunningData implements Serializable {
    
        private static final long serialVersionUID = 4260577459043203630L;
    
    
        private Long cid;
        private String name;
        public Long getCid() {
            return cid;
        }
        public void setCid(Long cid) {
            this.cid = cid;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    
    }
    /**
     * 工作服务器
     */
    public class WorkServer {
    
        // 记录服务器状态
        private volatile boolean running = false;
    
        private ZkClient zkClient;
        // Master节点对应zookeeper中的节点路径
        private static final String MASTER_PATH = "/master";
        // 监听Master节点删除事件
        private IZkDataListener dataListener;
        // 记录当前节点的基本信息
        private RunningData serverData;
        // 记录集群中Master节点的基本信息
        private RunningData masterData;
    
        private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
        private int delayTime = 5;
    
        public WorkServer(RunningData rd) {
            this.serverData = rd; // 记录服务器基本信息
            this.dataListener = new IZkDataListener() {
    
                public void handleDataDeleted(String dataPath) throws Exception {
    
                    //takeMaster();
    
                    if (masterData != null && masterData.getName().equals(serverData.getName())){
                        // 自己就是上一轮的Master服务器,则直接抢
                        takeMaster();
                    } else {
                        // 否则,延迟5秒后再抢。主要是应对网络抖动,给上一轮的Master服务器优先抢占master的权利,避免不必要的数据迁移开销
                        delayExector.schedule(new Runnable(){
                            public void run(){
                                takeMaster();
                            }
                        }, delayTime, TimeUnit.SECONDS);
                    }
    
                }
    
                public void handleDataChange(String dataPath, Object data)
                        throws Exception {
    
                }
            };
        }
    
        public ZkClient getZkClient() {
            return zkClient;
        }
    
        public void setZkClient(ZkClient zkClient) {
            this.zkClient = zkClient;
        }
    
        // 启动服务器
        public void start() throws Exception {
            if (running) {
                throw new Exception("server has startup...");
            }
            running = true;
            // 订阅Master节点删除事件
            zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
            // 争抢Master权利
            takeMaster();
    
        }
    
        // 停止服务器
        public void stop() throws Exception {
            if (!running) {
                throw new Exception("server has stoped");
            }
            running = false;
    
            delayExector.shutdown();
            // 取消Master节点事件订阅
            zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
            // 释放Master权利
            releaseMaster();
    
        }
    
        // 争抢Master
        private void takeMaster() {
            if (!running)
                return;
    
            try {
                // 尝试创建Master临时节点
                zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
                masterData = serverData;
                System.out.println(serverData.getName()+" is master");
    
                // 作为演示,我们让服务器每隔5秒释放一次Master权利
                delayExector.schedule(new Runnable() {
                    public void run() {
                        // TODO Auto-generated method stub
                        if (checkMaster()){
                            releaseMaster();
                        }
                    }
                }, 5, TimeUnit.SECONDS);
    
            } catch (ZkNodeExistsException e) { // 已被其他服务器创建了
                // 读取Master节点信息
                RunningData runningData = zkClient.readData(MASTER_PATH, true);
                if (runningData == null) {
                    takeMaster(); // 没读到,读取瞬间Master节点宕机了,有机会再次争抢
                } else {
                    masterData = runningData;
                }
            } catch (Exception e) {
                // ignore;
            }
    
        }
    
        // 释放Master权利
        private void releaseMaster() {
            if (checkMaster()) {
                zkClient.delete(MASTER_PATH);
            }
        }
    
        // 检测自己是否为Master
        private boolean checkMaster() {
            try {
                RunningData eventData = zkClient.readData(MASTER_PATH);
                masterData = eventData;
                if (masterData.getName().equals(serverData.getName())) {
                    return true;
                }
                return false;
            } catch (ZkNoNodeException e) {
                return false; // 节点不存在,自己肯定不是Master了
            } catch (ZkInterruptedException e) {
                return checkMaster();
            } catch (ZkException e) {
                return false;
            }
        }
    
    }
  • 相关阅读:
    Less-21
    Less-22
    Less-21
    Less-20
    ssrf redis gopher
    Less19
    Less18
    Arm 系统查看、修改系统时间
    通过 grpc 请求标头发送自定义数据
    gRpc 空参数
  • 原文地址:https://www.cnblogs.com/tinyj/p/10029186.html
Copyright © 2011-2022 走看看