zoukankan      html  css  js  c++  java
  • Zookeeper--集群管理

    https://luangeng.space

    Zookeeper--集群管理

    在多台服务器组成的集群中,需要监控每台服务器的状态,一旦某台服务器挂掉了或有新的机器加入集群,集群都要感知到,从而采取相应的措施。一个主动的集群可以自动感知节点的死亡和新节点的加入,它才对更高效的提供服务。通常的做法是有台主机器定时的去获取其他机器的心跳,或其他机器定时主动汇报自己的状态,这种方式存在一定的延时,并且主机器成为单点,一旦挂掉便影响整个集群。

    使用Zookeeper可以方便的实现集群管理的功能。思路如下,每个服务器启动时都向zk服务器提出创建临时节点的请求,并且使用getChildren设置父节点的观察,当该服务器挂掉之后,它创建的临时节点也被Zookeeper服务器删除,然后会触发监视器,其他服务器便得到通知。创建新节点也是同理。

    并且利用Zookeeper的Leader选举功能可以选出服务中的一台作为Leader,在比如任务调度类似的场景中有用。

    下面是一个简单的模拟:

    ServerUnit模拟在不同机器上启动的服务,启动时向Zookeeper服务器注册自己,并保存自己的IP和端口;实现CallBack接口:在其他节点发生变化时执行的逻辑

    public class ServerUnit {
    
        public static final String SER_NAME = "ServerUnit";
    
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
    
            System.out.println("begin register to Zookeeper..");
    
            String address = IPUtil.getLoaclIP() + ":" + new Random().nextInt(255);
    
            ServiceMng mng = new ServiceMng(SER_NAME);
    
            String serverId = mng.register(address, new CallBack<ServiceMng.ChildrenChangedResult>() {
                @Override
                public void callback(ServiceMng.ChildrenChangedResult cn) throws KeeperException, InterruptedException {
                    for (String str : cn.getUp()) {
                        System.out.println("检测到服务加入: " + mng.queryAddress(str));
                    }
                    for (String str : cn.getDown()) {
                        System.out.println("检测到服务退出: " + mng.queryAddress(str));
                    }
                }
            });
    
            System.out.println("ServerUnit started at: " + address);
            TimeUnit.HOURS.sleep(1);
        }
    }

    ---

    CallBack接口:

    public interface CallBack<T> {
    
        void callback(T t) throws Exception;
    
    }

    ---

    ServerMng 提供向Zookeeper注册服务和获取服务等方法,被服务单元依赖

    public class ServiceMng {
    
        private static final String APPS_PATH = "/__apps__";
        private String serviceName;
        private ZooKeeper zk;
        private CountDownLatch latch = new CountDownLatch(1);
        private List<String> serList;
        private Map<String, String> serMap = new HashMap<>();
    
        ServiceMng(String serviceName) {
            this.serviceName = serviceName;
        }
    
        public String register(String address, CallBack callback) throws KeeperException, InterruptedException, IOException {
            if (zk != null) {
                throw new IllegalArgumentException("method should not invoke twice.");
            }
    
            zk = new ZooKeeper("localhost", 30000, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                        try {
                            List list = zk.getChildren(APPS_PATH + "/" + serviceName, true);
                            refresh(list);
                            callback.callback(new ChildrenChangedResult(list, serList));
                            serList = list;
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
    
            latch.await();
            if (zk.exists(APPS_PATH, false) == null) {
                zk.create(APPS_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            if (zk.exists(APPS_PATH + "/" + serviceName, false) == null) {
                zk.create(APPS_PATH + "/" + serviceName, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
    
            String path = zk.create(APPS_PATH + "/" + serviceName + "/", address.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            List list = zk.getChildren(APPS_PATH + "/" + serviceName, true);
            refresh(list);
            serList = list;
            return path;
        }
    
        private void refresh(List<String> paths) throws KeeperException, InterruptedException {
            for (String path : paths) {
                byte[] b = zk.getData(APPS_PATH + "/" + serviceName + "/" + path, false, null);
                serMap.put(path, new String(b));
            }
        }
    
        public String queryLeaderIp(String serviceName) throws KeeperException, InterruptedException {
            List<String> apps = zk.getChildren(APPS_PATH + "/" + serviceName, false);
            if (apps.isEmpty()) {
                return null;
            }
            Collections.sort(apps);
            byte[] data = zk.getData(apps.get(0), false, null);
            return new String(data);
        }
    
        public String queryRandomServerIp(String serviceName) throws KeeperException, InterruptedException {
            List<String> apps = zk.getChildren(APPS_PATH + "/" + serviceName, false);
            if (apps.isEmpty()) {
                return null;
            }
            Random r = new Random();
            byte[] data = zk.getData(apps.get(r.nextInt(apps.size())), false, null);
            return new String(data);
        }
    
        public String queryAddress(String path) {
            return serMap.get(path);
        }
    
        public static class ChildrenChangedResult {
            List<String> up = null;
            List<String> down = null;
    
            ChildrenChangedResult(List now, List last) {
                up = new LinkedList(now);
                up.removeAll(last);
                down = new LinkedList(last);
                down.removeAll(now);
            }
    
            public List<String> getUp() {
                return up;
            }
    
            public List<String> getDown() {
                return down;
            }
        }
    
    }

    ---

    依次启动3个ServerUnit,查看控制台:

      

    依次关闭2个ServerUnit,查看控制台:

     

    从关闭ServerUnit到控制台打印退出大概延迟8s左右,配置zk的tickTime=1000,比预期的要慢一些。

    end

  • 相关阅读:
    Spring Boot 2.4版本前后的分组配置变化及对多环境配置结构的影响
    Spring Boot 2.4 对多环境配置的支持更改
    Spring Boot 的2020最后一击:2.4.1、2.3.7、2.2.12 发布
    苹果M1芯片各种不支持,但居然可以刷朋友圈!你会买单吗?
    老板居然让我在Java项目中“造假”
    Spring Cloud正式移除Hystrix、Zuul等Netflix OSS组件
    为了Java微信支付V3开发包,我找出了微信支付文档至少六个错误
    IdentityServer4系列 | 支持数据持久化
    IdentityServer4系列 | 混合模式
    Gitlab Runner的分布式缓存实战
  • 原文地址:https://www.cnblogs.com/luangeng/p/7589132.html
Copyright © 2011-2022 走看看