zoukankan      html  css  js  c++  java
  • [hadoop][基本原理]zookeeper简单使用

    代码:https://github.com/xufeng79x/ZkClientTest

    1、简介

      zookeeper的基本原理和使用场景描述可参考:[hadoop][基本原理]zookeeper基本原理

      本文主要讲解zookeeper节点的增删除改查,以及watcher的使用。

    2.工程准备

      除了zookeeper的自身API外,有两个开源的api更加方便的去让开发者使用----ZkClient和Curator。

      上述两个开源API中个人感觉ZkClient使用起来更加直观明了,所以在这里我会使用ZkClient来做讲解。

      新建maven工程,在pom文件按照版本需求增加下列依赖:

        <dependency>
          <groupId>org.apache.zookeeper</groupId>
          <artifactId>zookeeper</artifactId>
          <version>3.4.6</version>
        </dependency>
        <dependency>
          <groupId>com.101tec</groupId>
          <artifactId>zkclient</artifactId>
          <version>0.6</version>
        </dependency> 

    3.增删改查操作

        public static void main(String[] args) {
            String znodePath = "/xufeng";
            String znodeChildPath1 = "/xufeng/xufeng";
            String znodeChildPath2 = "/xufeng/xufeng";
            
            // 创建连接(既创建session)
            //param1:zookeeper集群信息
            //param2:session超时时间
            //param3:连接超时时间
            //param4:序列化方法,既我们设定在znode上的数据类的序列与反序列方法
            ZkClient zc = new ZkClient("xufeng-1:2181,xufeng-2:2181,xufeng-3:2181", 1000, 10000, new SerializableSerializer());
            System.out.println("connected osssssssk!");
            
            //由于在创建此节点的时候已经指定了一般pojo类的序列类,所以我们可以直接将一个实例类在创建的时候设定到znode上去
            Server server = new Server("myserver", "12345");
            
            zc.create(znodePath, server, CreateMode.PERSISTENT);
            
            // 检查znode是否创建成功
            boolean isExist = zc.exists(znodePath);
            if (isExist)
            {
                System.out.println(znodePath + " has been created!");
            }
            
            // 勾取znode的状态信息
            Stat stat = new Stat();
            server = zc.readData(znodePath, stat);
            System.out.println("server info : " + server + " state : " + stat);
            
            // 创建子节点
            zc.create(znodeChildPath1, null, CreateMode.EPHEMERAL_SEQUENTIAL);
            zc.create(znodeChildPath2, null, CreateMode.EPHEMERAL_SEQUENTIAL);
            if (zc.exists(znodeChildPath1) && zc.exists(znodeChildPath2))
            {
                System.out.println("znodeChildPaths +  has been created!");
            }
            // 取得子节点列表
            List<String> childs = zc.getChildren(znodePath);
            System.out.println(childs);
            
            // 删除没有子节点的节点
            boolean e1 = zc.delete(znodeChildPath1);
            // 删除有子节点的节点
            boolean e2 = zc.deleteRecursive(znodePath);
            if (e1 && e2)
            {
                System.out.println("All znode have been deleted!");
            }
        }
    }

      其中Server为pojo类,它必须实现序列化接口(方法可以不必去实现,SerializableSerializer对象会做出处理)  

    // 必须实现序列化接口
    public class Server  implements Serializable{
        private String name = null;
        private String id = null;
        
        public Server(String name, String id) {
            super();
            this.name = name;
            this.id = id;
        }
        // 序列化与反序列化中必须要有这个构造方法(默认是有的,但是你一旦创建了其他构造方法,那么就默认没有了)
        public Server() {
            super();
            this.name = null;
            this.id = null;
        }

    运行结果:

    connected osssssssk!
    /xufeng has been created!
    server info : Server [name=myserver, id=12345] state : 73014444062,73014444062,1469712893248,1469712893248,0,0,0,0,108,0,73014444062
    
    [xufeng0000000000, xufeng0000000001]

    结果分析:

    1. 上述/xufeng 为永久节点,如果不删除就不会消失

    2. 设定在/xufeng节点上的数据也被正确的read出来。

    3. 节点状态也可以read出来,但是必须先构造Stat对象然后将数据勾出来。

    4. 创建节点时候可以指定是否SEQUENTIAL,如上的两个临时子节点就是用了SEQUENTIAL他会自动给名称按序不重复,这在队列,锁和主备场景下很有实际用处。

    4.节点状态订阅

      所谓状态订阅有三种,一种为节点数据变化的订阅,一种是节点状态改变和另外一种的节点子节点(或者其本身)创建和删除的变化的订阅,分别通过不同的回调函数来实现这两种订阅处理。

                                     

     

      下面代码执行后会创建如下节点:

    [zk: localhost:2181(CONNECTED) 6] ls /
    [hadoop-ha, yarn-leader-election, zookeeper, xufeng]
    [zk: localhost:2181(CONNECTED) 7] ls /xufeng
    [xufeng0000000000, xufeng0000000001]
    public class ZkClientScribeTest {
    
        public static void main(String[] args) throws InterruptedException {
            String znodePath = "/xufeng";
            String znodeChildPath1 = "/xufeng/xufeng";
            String znodeChildPath2 = "/xufeng/xufeng";
            
            // 创建连接(既创建session)
            //param1:zookeeper集群信息
            //param2:session超时时间
            //param3:连接超时时间
            //param4:序列化方法,既我们设定在znode上的数据类的序列与反序列方法
            ZkClient zc = new ZkClient("xufeng-1:2181,xufeng-2:2181,xufeng-3:2181", 10000, 10000, new BytesPushThroughSerializer());
            System.out.println("connected ok!");
            
            // 创建节点与其子节点,为了易于操作,我们将以永久节点来进行测试
            zc.create(znodePath, "I am /xufeng".getBytes(), CreateMode.PERSISTENT);
            zc.create(znodeChildPath1, "I am /xufeng/xufeng*0".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
            zc.create(znodeChildPath2, "I am /xufeng/xufeng*1".getBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
            
            // 订阅节点数据发生变化
            zc.subscribeDataChanges(znodePath, new IZkDataListener() {
                // 当其本身或者子节点的删除或者增加的时候讲触发此操作
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println(dataPath + " data has been deleted!");
                }
                
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    System.out.println(dataPath + " data has been change to " + String.valueOf(data));
                }
            });
            
            // 订阅节点及其子节点发生变化
            zc.subscribeChildChanges(znodePath, new IZkChildListener() {
                // 当其本身或者子节点的删除或者增加的时候讲触发此操作
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    System.out.println(parentPath + " childs has been change : ");
                    System.out.println(currentChilds);
                    
                }
            });
            
            // 订阅节点状态发生变化
            zc.subscribeStateChanges(new IZkStateListener() {
                public void handleStateChanged(KeeperState state) throws Exception {
                    System.out.println(state);
                    
                }
                
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    System.out.println(error);
                    
                }
                
                public void handleNewSession() throws Exception {
                    System.out.println("new session!");
                    
                }
            });
            
            
            Thread.sleep(Long.MAX_VALUE);
    
        }
    
    }

      1.修改/xufeng节点数据

    [zk: localhost:2181(CONNECTED) 8] set /xufeng 123
    cZxid = 0x1600000005
    ctime = Thu Jul 28 13:20:14 EDT 2016
    mZxid = 0x1600000008
    mtime = Thu Jul 28 13:21:54 EDT 2016
    pZxid = 0x1600000007
    cversion = 2
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 3
    numChildren = 2
    [zk: localhost:2181(CONNECTED) 9] 

      程序测将会输出:

    /xufeng data has been change to [B@4222023a

       2. 在/xufeng/节点下增加一个节点:

    [zk: localhost:2181(CONNECTED) 11] create -s /xufeng/xufeng 123
    Created /xufeng/xufeng0000000002

      程序侧将会输出:

    /xufeng childs has been change : 
    [xufeng0000000000, xufeng0000000001, xufeng0000000002]

      3.删除/xufeng/xufeng0000000000节点

    [zk: localhost:2181(CONNECTED) 12] delete /xufeng/xufeng0000000000
    [zk: localhost:2181(CONNECTED) 13] 

      程序侧将输出:

    /xufeng childs has been change : 
    [xufeng0000000001, xufeng0000000002]

      4.删除/xufeng节点

    [zk: localhost:2181(CONNECTED) 13] rmr /xufeng
    [zk: localhost:2181(CONNECTED) 14] 

      程序侧将会输出:可以看到先去删除子节点(触发watcher),在删除本身(触发watcher)然后触发数据删除watcher

    /xufeng childs has been change : 
    []
    /xufeng childs has been change : 
    null
    /xufeng data has been deleted!

    5. 思考-----如何保证一个临时节点不被删除(既如何维护这个临时节点)

    public class saveEphemeralNode {
        public static void main(String[] args) throws InterruptedException {
            String znodePath = "/xufeng_EPHEMERAL";
            
            // 创建连接(既创建session)
            //param1:zookeeper集群信息
            //param2:session超时时间
            //param3:连接超时时间
            //param4:序列化方法,既我们设定在znode上的数据类的序列与反序列方法
            ZkClient zc = new ZkClient("xufeng-1:2181,xufeng-2:2181,xufeng-3:2181", 10000, 10000, new SerializableSerializer());
            System.out.println("connected ok!");
            
            zc.create(znodePath, null, CreateMode.EPHEMERAL);
            
            Thread.sleep(Long.MAX_VALUE);
        }
    
    }

    上述代码中Thread.sleep(Long.MAX_VALUE)语句保证了进程不会退出,此时/xufeng_EPHEMERAL一直保持存在状态,而当手动结束此进程后的10秒后这个节点消失。

    有此得出结论,只要保持session长连接的存在,临时节点就不会被删除。

  • 相关阅读:
    肥胖儿筛选标准
    文章索引
    面向对象66原则
    [精]Xpath路径表达式
    [精]XPath入门教程
    孕产期高危因素
    “华而不实”的转盘菜单(pie menu)
    xmind用例导excel用例,然后再用python排版
    NSObject
    [self class]与[super class]
  • 原文地址:https://www.cnblogs.com/ios123/p/5726422.html
Copyright © 2011-2022 走看看