zoukankan      html  css  js  c++  java
  • zookeeper 实现注册中心 (关闭心跳日志)

    分布式注册配置中心:

    zookeeper由于拥有watcher机制,使得其拥有发布订阅的功能,而发布与订阅模型,即所谓的配置中心,

    顾名思义就是发布者将数据发布到 ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。

    应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,以后每次配置有更新的时候,

    都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。

    <!-- 添加springboot对 zookeeper 的支持 -->

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>

    由于客户端需要与zookeeper建立连接,获取数据,添加监控等等一系列的事情,

    所以这里封装一个Utils工具类。

    然后对于zookeeper连接客户端的地址的后面可以紧跟一个path,作为在根目录下的工作目录。

    该目录就是作为所有操作的根目录,这里使用 /zkRegistry

    监听的是该节点下的  conf  节点,

    由于zookeeper采用的是异步调用,所以这里需要使用一把锁锁住主线程,在连接成功后自动解锁,

    主线程再往下进行。这里使用CountDownLatch实现锁,在主线程创建,传递到DafaultWatcher的回掉函数中。

    package com..zookeeper.zkRegistry;
    
    import org.apache.zookeeper.ZooKeeper;
    import java.util.concurrent.CountDownLatch;
    
    public class ZkClientUtil {
    
        private static ZooKeeper zooKeeper;
    
        private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    
        private static CountDownLatch countDownLatch = new CountDownLatch(1);
    
        private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkRegistry";
    
    
        public static ZooKeeper getZkClient() throws Exception {
    
            zooKeeper = new ZooKeeper(address,1000,defaultWatcher);
            
            //等待 链接 CountDownLatch -1
            defaultWatcher.setCountDownLatch(countDownLatch);
            //继续执行
            countDownLatch.await(); 
    
            return zooKeeper;
    
        }
    }

    同时由于zookeeper基于watch机制实现发布订阅,所有的watcher都采用自定义的方式实现,

    首先是对连接成功的时候的DefaultWatcher。

    package com..zookeeper.zkRegistry;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    
    import java.util.concurrent.CountDownLatch;
    
    public class DefaultWatcher implements Watcher {
    
        private CountDownLatch countDownLatch;
    
        public void setCountDownLatch(CountDownLatch countDownLatch){
    
            this.countDownLatch = countDownLatch;
        }
    
    
    
        @Override
        public void process(WatchedEvent watchedEvent) {
    
            switch (watchedEvent.getState()) {
                case Unknown:
                    break;
                case Disconnected:
                    break;
                case NoSyncConnected:
                    break;
                case SyncConnected:
                    //初始化完成 -1
                    countDownLatch.countDown();
                    break;
                case AuthFailed:
                    break;
                case ConnectedReadOnly:
                    break;
                case SaslAuthenticated:
                    break;
                case Expired:
                    break;
            }
            System.out.println(watchedEvent.toString());
    
        }
    
    }

    创建一个接收数据的实体类对象:

    package com..zookeeper.zkRegistry;
    
    public class MyConfData {
    
        private String data;
    
        public String getData(){
    
            return data;
        }
    
        public void setData(String data){
    
            this.data = data;
        }
    }

    封装好所有的watcher和回调函数::

    package com..zookeeper.zkRegistry;
    
    import org.apache.zookeeper.AsyncCallback;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.concurrent.CountDownLatch;
    
    public class MyWatcherAndCallBack implements Watcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback {
    
        private ZooKeeper zooKeeper;
    
        private MyConfData myConfData;
    
        private CountDownLatch countDownLatch = new CountDownLatch(1);
    
        public void setZooKeeper(ZooKeeper zooKeeper){
    
            this.zooKeeper = zooKeeper;
        }
    
        public void setMyConfData(MyConfData myConfData){
    
            this.myConfData = myConfData;
        }
    
    
    
        //Watcher
        @Override
        public void process(WatchedEvent watchedEvent) {
    
            switch (watchedEvent.getType()) {
                case None:
                    break;
                case NodeCreated:
                    zooKeeper.getData("/conf",this,this,"第二次获取数据");
                    break;
                case NodeDeleted:
                    myConfData.setData("");
                    countDownLatch = new CountDownLatch(1);
                    break;
                case NodeDataChanged:
                    zooKeeper.getData("/conf",this,this,"第二次获取数据");
                    break;
                case NodeChildrenChanged:
                    break;
            }
    
        }
    
    
        //StatCallback
        @Override
        public void processResult(int i, String s, Object o, Stat stat) {
    
            zooKeeper.getData("/conf",this,this,"第一次获取数据");
    
    
        }
    
        //DataCallback
        @Override
        public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) {
    
            if(stat!=null){
                System.out.println("取得数据 : "+new String(bytes));
                myConfData.setData(new String(bytes));
    
                countDownLatch.countDown();
            }
        }
    
        public void await() throws InterruptedException {
    
            zooKeeper.exists("/conf",this,this,"127.0.0.1");
    
            countDownLatch.await();
        }
    }

    测试类:

    (可直接运行 getConfData() 方法,跑项目就放开注调的代码和注解,注@Test)

    package com..zookeeper.zkRegistry;
    
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;
    
    //@Component
    public class ZkClientConfing {
    
        private ZooKeeper zooKeeper;
    
        private MyConfData myConfData = new MyConfData();
    
        private MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
    
    
        @Before
        public void conn() throws Exception {
    
            zooKeeper = ZkClientUtil.getZkClient();
        }
    
    
        @After
        public void close() throws InterruptedException {
    
            zooKeeper.close();
        }
    
    
        @Test
    //    @PostConstruct
        public void getConfData() throws InterruptedException {
    
    //        try {
    //            zooKeeper = ZkClientUtil.getZkClient();
    //        } catch (Exception e) {
    //            e.printStackTrace();
    //        }
    
            myWatcherAndCallBack.setZooKeeper(zooKeeper);
    
            myWatcherAndCallBack.setMyConfData(myConfData);
    
            myWatcherAndCallBack.await();
    
            while (true){
    
                if(myConfData.getData().equals("")){
                    System.out.println("配置中心数据为空!!!!!");
                    myWatcherAndCallBack.await();//等待数据输入
                }
                //为了观察数据的变化,这里循环打印设置的数据。
                System.out.println("数据 : "+myConfData.getData());
    
                Thread.sleep(30000);
            }
    
        }
    }

    logback 关闭zookeeper的心跳日志:

    <configuration> 
    
      <logger name="org.apache.zookeeper.ClientCnxn" level="info" />  
    
    </configuration> 
    
    name 为包名
    
    level  为日志级别
  • 相关阅读:
    swfupload多文件上传[附源码]
    C#函数式程序设计之泛型(下)
    ASP.NET MVC实现POST方式的Redirect
    使用Windows Azure的VM安装和配置CDH搭建Hadoop集群
    Asp.Net MVC 上传图片到数据库
    ASP.NET Web API标准的“管道式”设计
    如何捕获和分析 JavaScript Error
    快学Scala习题解答—第一章 基础
    职场人生
    合伙人的重要性超过了商业模式和行业选择(转)
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14765386.html
Copyright © 2011-2022 走看看