zoukankan      html  css  js  c++  java
  • Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator)和Zookeeper会话

    一、Zookeeper原生API如何进行调用

    准备工作:

    首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入zk的依赖

        <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.10</version>
        </dependency>

    1. 连接zk并监听事件

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooKeeper;
    
    //连接zk并监听事件
    public class ZKDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
    
        public static void main(String[] args) throws IOException {
            ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDemo());
            System.out.println(zk.getState());
    
            try {
                cdl.await();
            } catch (Exception e) {
                System.out.println("ZK Session established.");
            }
        }
    
    
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event:" + event);
            if (KeeperState.SyncConnected == event.getState()) {
                cdl.countDown();
            }
        }
    }

    输出结果:

    CONNECTING
    Receive watched event:WatchedEvent state:SyncConnected type:None path:null

    2. 创建znode并监听事件

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    //创建znode并监听事件
    public class ZKOperateDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKOperateDemo());
            cdl.await();
    
            String path1 = zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("Success create path: " + path1);
            String path2 = zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("Success create path: " + path2);
        }
    
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event:" + event);
            if (KeeperState.SyncConnected == event.getState()) {
                cdl.countDown();
            }
        }
    }

    输出结果:

    Receive watched event:WatchedEvent state:SyncConnected type:None path:null
    Success create path: /zk-test-
    Success create path: /zk-test-0000000011

    3. 改变znode数据并监听事件

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    //改变znode数据并监听事件
    public class ZKDataDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
        private static ZooKeeper zk = null;
        private static Stat stat = new Stat();
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKDataDemo());
            cdl.await();
    
            zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(new String(zk.getData("/zk-test", true, stat)));
    
            zk.getData("/zk-test", true, stat);
            System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
            zk.setData("/zk-test", "123".getBytes(), -1);
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState()) {
                if (EventType.None == event.getType() && null == event.getPath()) {
                    cdl.countDown();
                } else if (event.getType() == EventType.NodeDataChanged) {
                    try {
                        System.out.println(new String(zk.getData(event.getPath(), true, stat)));
                        System.out.println(stat.getCzxid() + ", " + stat.getMzxid() + ", " + stat.getVersion());
                    } catch (Exception e) {
                    }
                }
            }
        }
    }

    输出结果:

    123
    4294967354, 4294967354, 0
    123
    4294967354, 4294967355, 1

    4. 改变子节点并监听事件

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    //改变子节点并监听事件
    public class ZKChildrenDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
        private static ZooKeeper zk = null;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenDemo());
            cdl.await();
    
            zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    
            List<String> list = zk.getChildren("/zk-test", true);
            for (String str : list)
                System.out.println(str);
    
            zk.create("/zk-test/c2", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState())
                if (EventType.None == event.getType() && null == event.getPath()) {
                    cdl.countDown();
                } else if (event.getType() == EventType.NodeChildrenChanged) {
                    try {
                        System.out.println("Child: " + zk.getChildren(event.getPath(), true));
                    } catch (Exception e) {
                    }
                }
        }
    }

    输出结果:

    c1
    Child: [c1, c2]

    5. 异步调用并完成回调

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.AsyncCallback;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    //异步调用并完成回调
    class ChildrenCallback implements AsyncCallback.Children2Callback {
        
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
            System.out.println(
                    "Child: " + rc + ", path: " + path + ", ctx: " + ctx + ", children: " + children + ", stat: " + stat);
        }
    }
    
    public class ZKChildrenAsyncDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
        private static ZooKeeper zk = null;
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKChildrenAsyncDemo());
            cdl.await();
    
            zk.create("/zk-test", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            zk.create("/zk-test/c1", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    
            zk.getChildren("/zk-test", true, new ChildrenCallback(), "ok");
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            if (KeeperState.SyncConnected == event.getState())
                if (EventType.None == event.getType() && null == event.getPath()) {
                    cdl.countDown();
                } else if (event.getType() == EventType.NodeChildrenChanged) {
                    try {
                        System.out.println("Child: " + zk.getChildren(event.getPath(), true));
                    } catch (Exception e) {
                    }
                }
        }
    }

    输出结果:

    Child: 0, path: /zk-test, ctx: ok, children: [c1], stat: 4294967369,4294967369,1535536716381,1535536716381,0,1,0,0,3,1,4294967370

    6. 连接后创建回调

    package com.study.demo.zk;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.AsyncCallback;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    //连接后创建回调
    class IStringCallback implements AsyncCallback.StringCallback {
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("create path result: [" + rc + ", " + path + "," + ctx + ", real path name: " + name);
        }
    }
    
    public class ZKAsyncDemo implements Watcher {
        private static final CountDownLatch cdl = new CountDownLatch(1);
    
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            ZooKeeper zk = new ZooKeeper("192.168.152.130:2181", 5000, new ZKAsyncDemo());
            cdl.await();
    
            zk.create("/zk-test-", "123".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),
                    new String("I am context"));
    
            zk.create("/zk-test-", "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                    new IStringCallback(), new String("I am context"));
    
            zk.create("/zk-test-", "789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    new IStringCallback(), new String("I am context"));
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
        //监听到事件时进行处理
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event:" + event);
            if (KeeperState.SyncConnected == event.getState()) {
                cdl.countDown();
            }
        }
    }

    输出结果:

    Receive watched event:WatchedEvent state:SyncConnected type:None path:null
    create path result: [0, /zk-test-,I am context, real path name: /zk-test-
    create path result: [-110, /zk-test-,I am context, real path name: null
    create path result: [0, /zk-test-,I am context, real path name: /zk-test-0000000016

    Chroot命名空间:
    主要为了对业务进行隔离性

    示例:
    Zookeeper client=new Zookeeper(“192.168.56.101:2181/zk-client”, ........)
    /zk-client就是Chroot命名空间。Chroot命名空间可以多级
    后续的操作都只能在/zk-client及它的子节点下进行,由此进行了业务隔离

    二、ZKClient

    ZKClient的优点:

    1)可以递归创建。在zookeeper命令行和zookeeper的原生API里面得先创建父节点才能创建子节点
    2)可以递归删除。在zookeeper命令行和zookeeper的原生API里面得先删除子节点才能删除父节点
    3)避免不存在的异常

    准备工作:

    首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入ZKClient的依赖

            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>

    1. ZkClient递归创建顺序节点

    package com.study.demo.client;
    
    import org.I0Itec.zkclient.ZkClient;
    
    /**
     * 
    * @Description: ZkClient递归创建顺序节点
    * @author leeSmall
    * @date 2018年9月2日
    *
     */
    public class CreateNodeDemo {
        public static void main(String[] args) {
            ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
            String path = "/zk-client/c1";
            // 递归创建顺序节点 true:先创建父节点/zk-client
            client.createPersistent(path, true);
        }
    }

     创建成功:

    2. ZkClient获取数据并监听事件

    package com.study.demo.client;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    /**
    * 
    * @Description: ZkClient获取数据
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class GetDataDemo {
        public static void main(String[] args) throws InterruptedException {
            String path = "/zk-client";
            ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
            //创建临时节点
            client.createEphemeral(path, "123");
    
            //注册父节点数据改变的事件
            client.subscribeDataChanges(path, new IZkDataListener() {
                
                //父节点数据改变事件
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    System.out.println(dataPath + " changed: " + data);
                }
    
                //父节点数据删除事件
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println(dataPath + " deleted");
                }
            });
    
            System.out.println(client.readData(path).toString());
            client.writeData(path, "456");
            Thread.sleep(1000);
            client.delete(path);
            //sleep的目的是为了更好的观察事件变化
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

     输出结果:

    123
    /zk-client changed: 456
    /zk-client deleted

    3. ZkClient获取子节点数据并监听事件

    package com.study.demo.client;
    
    import java.util.List;
    
    import org.I0Itec.zkclient.IZkChildListener;
    import org.I0Itec.zkclient.ZkClient;
    
    /**
    * 
    * @Description: ZkClient获取子节点数据
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class GetChildrenDemo {
        public static void main(String[] args) throws InterruptedException {
            String path = "/zk-client";
            ZkClient client = new ZkClient("192.168.152.130:2181", 5000);
            //注册子节点数据改变的事件
            client.subscribeChildChanges(path, new IZkChildListener() {
                
                //子节点数据改变事件
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    System.out.println(parentPath + "的子发生变化: " + currentChilds);
                }
            });
    
            //创建顺序节点
            client.createPersistent(path);
            Thread.sleep(1000);
            //获取子节点数据 此时还没有创建获取不到
            System.out.println(client.getChildren(path));
            //在前面的父节点 /zk-client下创建子节点c1
            client.createPersistent(path + "/c1");
            Thread.sleep(1000);
            //删除子节点
            client.delete(path + "/c1");
            Thread.sleep(1000);
            //删除父节点
            client.delete(path);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

     输出结果:

    /zk-client的子发生变化: []
    []
    /zk-client的子发生变化: [c1]
    /zk-client的子发生变化: []
    /zk-client的子发生变化: null

    三、Curator

    curator是连接ZK应用最广泛的工具

    原因如下:

    1)zk应用场景(分布式锁,Master选举等等),curator包含了这些场景。
    2)应用场景出现极端的情况下,curator考虑到处理了。

    准备工作:

    首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入curator的依赖

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
            </dependency>

    1. curator创建连接session

    package com.study.demo.curator;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    /**
    * 
    * @Description: curator创建连接session
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class CreateSessionDemo {
        public static void main(String[] args) throws InterruptedException {
            RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(policy).build();
            client.start();
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

    这里介绍一种算法:Backoff退避算法

    有这样一种场景,有多个请求,如果网络出现阻塞,每1分钟重试一次。
    20:25 request1(block)
    20:26 request2(block)
    20:27 request3(block)
    当网络通顺的时候,请求都累在一起来发送
    20:28 request4(通顺)request2、3、4
    那么前面的请求就没有意义了,所以就有了退避算法,按照指数间隔重试,比如第一次1分钟,第二次2分钟......随着时间的推移,重试间隔越长。

    2. curator递归创建顺序节点

    package com.study.demo.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    
    /**
    * 
    * @Description: curator递归创建顺序节点
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class CreateNodeDemo {
        public static void main(String[] args) throws Exception {
            String path = "/zk-curator/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "test".getBytes());
        }
    }

    创建成功:

    3. curator异步创建临时节点

    package com.study.demo.curator;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.api.BackgroundCallback;
    import org.apache.curator.framework.api.CuratorEvent;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    
    /**
    * 
    * @Description: curator异步创建临时节点
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class CreateNodeAsyncDemo {
        static CountDownLatch cdl = new CountDownLatch(2);
        static ExecutorService es = Executors.newFixedThreadPool(2);
    
        public static void main(String[] args) throws Exception {
            String path = "/zk-curator";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            
            //创建临时节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
                //回调事件处理
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                    cdl.countDown();
                }
            }, es).forPath(path, "test".getBytes());
    
            //创建临时节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
                
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("event code: " + event.getResultCode() + ", type: " + event.getType());
                    cdl.countDown();
                }
            }).forPath(path, "test".getBytes());
    
            cdl.await();
            es.shutdown();
        }
    }

     输出结果:

    event code: 0, type: CREATE
    event code: -110, type: CREATE

    4. curator更新节点数据

    package com.study.demo.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;
    
    /**
    * 
    * @Description: curator更新节点数据
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class UpdateDataDemo {
        public static void main(String[] args) throws Exception {
            String path = "/zk-curator/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
            Stat stat = new Stat();
            client.getData().storingStatIn(stat).forPath(path);
            System.out.println("Current data: " + stat.getVersion());
            System.out.println("Update data: "
                    + client.setData().withVersion(stat.getVersion()).forPath(path, "some".getBytes()).getVersion());
        }
    }

     输出结果:

    Current data: 0
    Update data: 1

    5. curator删除节点数据

    package com.study.demo.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.data.Stat;
    
    /**
    * 
    * @Description: curator删除节点数据
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class DeleteNodeDemo {
        public static void main(String[] args) throws Exception {
            String path = "/zk-curator/c1";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
            Stat stat = new Stat();
            client.getData().storingStatIn(stat).forPath(path);
            client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
        }
    }

    6. curator事件监听

    package com.study.demo.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    
    /**
    * 
    * @Description: curator事件监听
    * @author leeSmall
    * @date 2018年9月2日
    *
    */
    public class NodeCacheDemo {
        public static void main(String[] args) throws Exception {
            String path = "/zk-curator/nodecache";
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.152.130:2181")
                    .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
            client.start();
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "test".getBytes());
    
            final NodeCache nc = new NodeCache(client, path, false);
            nc.start();
            //通过回调函数监听事件
            nc.getListenable().addListener(new NodeCacheListener() {
                
                public void nodeChanged() throws Exception {
                    System.out.println("update--current data: " + new String(nc.getCurrentData().getData()));
                }
            });
            
            client.setData().forPath(path, "test123".getBytes());
            Thread.sleep(1000);
            client.delete().deletingChildrenIfNeeded().forPath(path);
            Thread.sleep(5000);
            nc.close();
        }
    }

     输出结果:

    update--current data: test123

    Curator事件监听:

    NodeCache:节点处理监听(会使用缓存)。回调接口NodeCacheListener

    PathChildrenCache:子节点缓存,处理子节点变化。回调接口PathChildrenCacheListener

    TreeCache:NodeCache和PathChildrenCache的结合体。回调接口TreeCacheListener

    四、zookeeper会话

    1. zookeeper连接的几种状态

    CONNECTING 正在连接
    CONNECTED 已经连接
    RECONNECTING 正在重新连接
    RECONNECTED 重新连接上
    CLOSE 会话关闭

    2. session

    2.1 session主要由几个类控制:

    SessionTracker, LearnerSessionTracker, SessionTrackerImpl

    session初始化的方法:

    org.apache.zookeeper.server.SessionTrackerImpl.initializeNextSession(long)

    public static long initializeNextSession(long id) {
            long nextSid = 0;
            nextSid = (System.currentTimeMillis() << 24) >>> 8;
            nextSid =  nextSid | (id <<56);
            return nextSid;
        }

     说明:

    SessionID的分配(初始化)函数,策略如下:
    1)取时间,并且左移24位得到的结果再右移8位(高8位,低16位都是0)
    2)sid拿出来进行左移56位
    3)和第一步的结果做或运算

    2.2 Session分桶(zookeeper的一个特性)

    按照Session会话过期时间进行分区块保存。
    这样设计的好处:可以快速清理过期的session

    2.3 session激活过程:

    1)检测会话是否过期
    2)计算会话下一次超时时间
    3)定位会话的所在区块
    4)迁移会话

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    Pycharm在线/手动离线安装第三方库-以scapy为例(本地离线添加已经安装的第三方库通过添加Path实现)
    python+splinter实现12306网站刷票并自动购票流程
    利用RELK进行日志收集
    web安全之文件上传漏洞攻击与防范方法
    C# 使用 CancellationTokenSource 终止线程
    ASP.NET MVC 下拉框的传值的两种方式
    SQL Server(解决问题)已成功与服务器建立连接,但是在登录过程中发生错误。(provider: Shared Memory Provider, error:0
    C# .net中json字符串和对象之间的转化方法
    asp.net 未能加载文件或程序集“WebApi”或它的某一个依赖项。试图加载格式不正确的程序。
    将博客搬至CSDN
  • 原文地址:https://www.cnblogs.com/leeSmall/p/9576437.html
Copyright © 2011-2022 走看看