package com.zkdemo.zkdemo.client; import com.zkdemo.zkdemo.client.MyZkWatcher; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; /** * 描述:zookeeper增删改查 * 作者:七脉 * 重点:删除、修改、查询所用的version版本号,分布式事务锁的实现方式,乐观锁。ACL权限:所有人、限定人、限定IP等 */ public class Test { private static Logger log = LogManager.getLogger(Test.class); //集群节点 public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183"; //单一节点 public static final String zkServerSingleConnect = "192.168.1.90:2181"; //超时毫秒数 public static final int timeout = 3000; public static void main(String[] args) throws InterruptedException, IOException, KeeperException { //建立连接 ZooKeeper zk = connect(); //zk.close();//关闭后不支持重连 log.info("zk 状态:" + zk.getState()); /**恢复会话连接**/ //long sessionId = zk.getSessionId(); //byte[] sessionPasswd = zk.getSessionPasswd(); //zk2会话重连后,zk会话将失效,不再支持做增删改查操作。 //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd); /**创建节点**/ //create(zk, "/myzk", "myzk"); /**查询节点Data**/ queryData(zk, "/myzk"); /**修改节点data**/ //update(zk, "/myzk", "myzk-update"); /**删除节点**/ //delete(zk, "/myzk"); } /** * 描述:建立连接 * 作者:七脉 * * @return * @throws IOException * @throws InterruptedException */ public static ZooKeeper connect() throws IOException, InterruptedException { //可重入锁 CountDownLatch cdl = new CountDownLatch(1); log.info("准备建立zk服务"); ZooKeeper zk = new ZooKeeper(zkServerSingleConnect, timeout, new MyZkWatcher(cdl, "建立连接")); log.info("完成建立zk服务"); //这里为了等待wather监听事件结束 cdl.await(); return zk; } /** * 描述:重新连接服务 * 作者:七脉 * * @param sessionId 现有会话ID * @param sessionPasswd 现有会话密码 * @return * @throws IOException * @throws InterruptedException 重点:关闭后的会话连接,不支持重连。重连后,前会话连接将会失效。 */ public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException { //可重入锁 CountDownLatch cdl = new CountDownLatch(1); log.info("准备重新连接zk服务"); ZooKeeper zk = new ZooKeeper(zkServerSingleConnect, timeout, new MyZkWatcher(cdl, "重新连接"), sessionId, sessionPasswd); log.info("完成重新连接zk服务"); //这里为了等待wather监听事件结束 cdl.await(); return zk; } /** * 描述:创建节点 * 作者:七脉 * * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException */ public static void create(ZooKeeper zk, String nodePath, String nodeData) throws KeeperException, InterruptedException { log.info("开始创建节点:{}, 数据:{}", nodePath, nodeData); List<ACL> acl = Ids.OPEN_ACL_UNSAFE; //创建持久节点 CreateMode createMode = CreateMode.PERSISTENT; String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode); //创建节点有两种,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 log.info("创建节点返回结果:{}", result); log.info("完成创建节点:{}, 数据:{}", nodePath, nodeData); } /** * 描述:查询节点结构信息 * 作者:七脉 * * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */ public static Stat queryStat(ZooKeeper zk, String nodePath) throws KeeperException, InterruptedException { log.info("准备查询节点Stat,path:{}", nodePath); Stat stat = zk.exists(nodePath, false); log.info("结束查询节点Stat,path:{},version:{}", nodePath, stat.getVersion()); return stat; } /** * 描述:查询节点Data值信息 * 作者:七脉 * * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */ public static String queryData(ZooKeeper zk, String nodePath) throws KeeperException, InterruptedException { log.info("准备查询节点Data,path:{}", nodePath); String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath))); log.info("结束查询节点Data,path:{},Data:{}", nodePath, data); return data; } /** * 描述:修改节点 * 作者:七脉 * * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException 重点:每次修改节点的version版本号都会变更,所以每次修改都需要传递节点原版本号,以确保数据的安全性。 */ public static Stat update(ZooKeeper zk, String nodePath, String nodeData) throws KeeperException, InterruptedException { //修改节点前先查询该节点信息 Stat stat = queryStat(zk, nodePath); log.info("准备修改节点,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion()); Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion()); //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 //zk.setData(path, data, version, cb, ctx); log.info("完成修改节点,path:{},data:{},现version:{}", nodePath, nodeData, newStat.getVersion()); return stat; } /** * 描述:删除节点 * 作者:七脉 * * @param zk * @param nodePath * @throws InterruptedException * @throws KeeperException */ public static void delete(ZooKeeper zk, String nodePath) throws InterruptedException, KeeperException { //删除节点前先查询该节点信息 Stat stat = queryStat(zk, nodePath); log.info("准备删除节点,path:{},原version:{}", nodePath, stat.getVersion()); zk.delete(nodePath, stat.getVersion()); //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 //zk.delete(path, version, cb, ctx); log.info("完成删除节点,path:{}", nodePath); } }
package com.zkdemo.zkdemo.client; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 描述:ZK服务观察者事件 * 作者:七脉 */ public class MyZkWatcher implements Watcher{ private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class); //异步锁 private CountDownLatch cdl; //标记 private String mark; public MyZkWatcher(CountDownLatch cdl,String mark) { this.cdl = cdl; this.mark = mark; } //监听事件处理方法 public void process(WatchedEvent event) { log.info(mark+" watcher监听事件:{}",event); cdl.countDown(); } }
学习参考地址:https://www.cnblogs.com/liuwei6/p/6736335.html
Apache Curator https://www.cnblogs.com/erbing/p/9799098.html