分布式注册配置中心:
zookeeper由于拥有watcher机制,使得其拥有发布订阅的功能,而发布与订阅模型,即所谓的配置中心,
顾名思义就是发布者将数据发布到 ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。
应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个 Watcher,这样一来,以后每次配置有更新的时候,
都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。
<!-- 添加springboot对 zookeeper 的支持 -->
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency>
由于客户端需要与zookeeper建立连接,获取数据,添加监控等等一系列的事情,
所以这里封装一个Utils工具类。
然后对于zookeeper连接客户端的地址的后面可以紧跟一个path,作为在根目录下的工作目录。
该目录就是作为所有操作的根目录,这里使用 /zkRegistry
监听的是该节点下的 conf 节点,
由于zookeeper采用的是异步调用,所以这里需要使用一把锁锁住主线程,在连接成功后自动解锁,
主线程再往下进行。这里使用CountDownLatch实现锁,在主线程创建,传递到DafaultWatcher的回掉函数中。
package com..zookeeper.zkRegistry; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; public class ZkClientUtil { private static ZooKeeper zooKeeper; private static DefaultWatcher defaultWatcher = new DefaultWatcher(); private static CountDownLatch countDownLatch = new CountDownLatch(1); private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkRegistry"; public static ZooKeeper getZkClient() throws Exception { zooKeeper = new ZooKeeper(address,1000,defaultWatcher); //等待 链接 CountDownLatch -1 defaultWatcher.setCountDownLatch(countDownLatch); //继续执行 countDownLatch.await(); return zooKeeper; } }
同时由于zookeeper基于watch机制实现发布订阅,所有的watcher都采用自定义的方式实现,
首先是对连接成功的时候的DefaultWatcher。
package com..zookeeper.zkRegistry; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; public class DefaultWatcher implements Watcher { private CountDownLatch countDownLatch; public void setCountDownLatch(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: //初始化完成 -1 countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } System.out.println(watchedEvent.toString()); } }
创建一个接收数据的实体类对象:
package com..zookeeper.zkRegistry; public class MyConfData { private String data; public String getData(){ return data; } public void setData(String data){ this.data = data; } }
封装好所有的watcher和回调函数::
package com..zookeeper.zkRegistry; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; public class MyWatcherAndCallBack implements Watcher,AsyncCallback.StatCallback,AsyncCallback.DataCallback { private ZooKeeper zooKeeper; private MyConfData myConfData; private CountDownLatch countDownLatch = new CountDownLatch(1); public void setZooKeeper(ZooKeeper zooKeeper){ this.zooKeeper = zooKeeper; } public void setMyConfData(MyConfData myConfData){ this.myConfData = myConfData; } //Watcher @Override public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case None: break; case NodeCreated: zooKeeper.getData("/conf",this,this,"第二次获取数据"); break; case NodeDeleted: myConfData.setData(""); countDownLatch = new CountDownLatch(1); break; case NodeDataChanged: zooKeeper.getData("/conf",this,this,"第二次获取数据"); break; case NodeChildrenChanged: break; } } //StatCallback @Override public void processResult(int i, String s, Object o, Stat stat) { zooKeeper.getData("/conf",this,this,"第一次获取数据"); } //DataCallback @Override public void processResult(int i, String s, Object o, byte[] bytes, Stat stat) { if(stat!=null){ System.out.println("取得数据 : "+new String(bytes)); myConfData.setData(new String(bytes)); countDownLatch.countDown(); } } public void await() throws InterruptedException { zooKeeper.exists("/conf",this,this,"127.0.0.1"); countDownLatch.await(); } }
测试类:
(可直接运行 getConfData() 方法,跑项目就放开注调的代码和注解,注@Test)
package com..zookeeper.zkRegistry; import org.apache.zookeeper.ZooKeeper; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; //@Component public class ZkClientConfing { private ZooKeeper zooKeeper; private MyConfData myConfData = new MyConfData(); private MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack(); @Before public void conn() throws Exception { zooKeeper = ZkClientUtil.getZkClient(); } @After public void close() throws InterruptedException { zooKeeper.close(); } @Test // @PostConstruct public void getConfData() throws InterruptedException { // try { // zooKeeper = ZkClientUtil.getZkClient(); // } catch (Exception e) { // e.printStackTrace(); // } myWatcherAndCallBack.setZooKeeper(zooKeeper); myWatcherAndCallBack.setMyConfData(myConfData); myWatcherAndCallBack.await(); while (true){ if(myConfData.getData().equals("")){ System.out.println("配置中心数据为空!!!!!"); myWatcherAndCallBack.await();//等待数据输入 } //为了观察数据的变化,这里循环打印设置的数据。 System.out.println("数据 : "+myConfData.getData()); Thread.sleep(30000); } } }
logback 关闭zookeeper的心跳日志:
<configuration>
<logger name="org.apache.zookeeper.ClientCnxn" level="info" />
</configuration>
name 为包名
level 为日志级别