zoukankan      html  css  js  c++  java
  • zookeeper的使用

    1、数据模型

      zk的存储结构和标准的文件系统非常类似,每一个节点称之为ZNode,是zk的最小单元。每个ZNode上都可以保存数据以及添加子节点,形成一个层次化的树形结构。节点类型有以下几种:

      (1)持久节点(PERSISTENT),创建后会一直在zk服务器上,直到主动删除。

      (2)持久有序节点(PERSISTENT_SEQUENTIAL),同一级节点保持有序性。

      (3)临时节点(EPHEMERAL),临时节点的生命周期和客户端的会话绑定在一起,当客户端会话 失效该节点自动清理 。

      (4)临时有序节点

      zk通过版本控制来保证分布式数据原子性:zk中每个数据节点有三个版本信息,对数据节点的任何更新操作都会引起版本号的变化,其原理和乐观锁类似。

        version:当前数据节点内容的版本号

        cversion:当前数据节点子节点的版本号

        aversion:当前数据节点ACL(访问权限)变更版本号

    2、zookeeper常用java客户端

      zk比较常用的java客户端是curator,它封装了zk client和server之间的连接处理,并且封装了很多应用场景(分布式锁、leader选举)。

    <dependency> 
        <groupId>org.apache.curator</groupId> 
        <artifactId>curator-framework</artifactId> 
        <version>4.0.0</version> 
    </dependency>
    
    //建立连接
    CuratorFramework 
    curatorFramework=CuratorFrameworkFactory. builder (). 
            
    connectString( CONNECTION_STR ).sessionTimeoutMs(5000)
    . 
            retryPolicy(new 
    ExponentialBackoffRetry(1000,3)).namespace(“curator
    ”).build();

    重试策略:Curator内部实现的几种重试策略: 

      • ExponentialBackoffRetry:重试指定的次数, 且每一次重试之 间停顿的时间逐渐增加.

      • RetryNTimes:指定最大重试次数的重试策略

      • RetryOneTime:仅重试一次

      • RetryUntilElapsed:一直重试直到达到规定的时间 

    namespace: session会话含有隔离命名空间,即客户端对 Zookeeper 上数据节点的任何操作都是相对/curator 目录进行的,这有利于实现不同的 Zookeeper 的业务之间的隔离。

    zk client与zk server连接状态:

    连接状态状态含义
    KeeperState.Expired 客户端和服务器在ticktime的时间周期内,是要发送心跳通知的。这是租约协议的一个实现。客户端发送request,告诉服务器其上一个租约时间,服务器收到这个请求后,告诉客户端其下一个租约时间是哪个时间点。当客户端时间戳达到最后一个租约时间,而没有收到服务器发来的任何新租约时间,即认为自己下线(此后客户端会废弃这次连接,并试图重新建立连接)。这个过期状态就是Expired状态
    KeeperState.Disconnected 就像上面那个状态所述,当客户端断开一个连接(可能是租约期满,也可能是客户端主动断开)这是客户端和服务器的连接就是Disconnected状态
    KeeperState.SyncConnected 一旦客户端和服务器的某一个节点建立连接(注意,虽然集群有多个节点,但是客户端一次连接到一个节点就行了),并完成一次version、zxid的同步,这时的客户端和服务器的连接状态就是SyncConnected
    KeeperState.AuthFailed zookeeper客户端进行连接认证失败时,发生该状态

    3、zk事件监听机制(Watcher)

      Watcher监听机制是Zookeeper中非常重要的特性,可以绑定监听事件的节点进行监听,比如可以监听节点数据变更、节点删除、子节点状态变更等 事件,通过这个事件机制,可以基于zookeeper实现分布式 锁、集群管理等功能。

    节点事件 事件含义
    EventType.NodeCreated 节点被创建时,该事件被触发
    EventType.NodeChildrenChanged 节点的直接子节点被创建、被删除、子节点数据发生变更时,该事件被触发
    EventType.NodeDataChanged 节点的数据发生变更时,该事件被触发
    EventType.NodeDeleted 节点被删除时,该事件被触发
    EventType.None zookeeper客户端的连接状态发生变更时,该事件被触发

      watcher 机制有一个特性:当数据发生改变的时候,那么 zookeeper会产生一个watch事件并发送到客户端,但是客户端只会收到一次这样的通知,如果以后这个数据再发生变 化,那么之前设置 watch的客户端不会再次收到消息。因为它是一次性的;如果要实现永久监听,可以通过循环注册来实现。

      Curator提供了三种Watcher来监听节点的变化:

        • PathChildCache:监视一个路径下孩子结点的创建、删 除、更新。

        • NodeCache:监视当前结点的创建、更新、删除,并将 结点的数据缓存在本地。

        • TreeCache:PathChildCache和NodeCache的“合体”, 监视路径下的创建、更新、删除事件,并缓存路径下所 有孩子结点的数据。

    4、基于Curator实现分布式锁

    4.1、zookeeper实现分布式锁的原理  

      第一种:利用单节点实现分布式锁

      利用zookeeper同级节点的唯一性来实现排他锁。多个线程向zk指定的节点下创建一个相同名称的节点,只有一个能成功(成功的则获取锁),其他失败的线程通过watcher机制来监听zk这个子节点的变化,一旦监听到这个子节点的删除事件(删除则释放锁),则再次触发所有节点去竞争锁(即创建子节点)。

      这种实现方式很简单,但是会产生“惊群效应”,简单来说就是如果存在许多的客户端在等待获取锁,当成功获取到锁的进程释放该节点后,所有处于等待状态的客户端都会被唤醒,这个时候zookeeper在短时间内发送大量子节点变更事件给所有待获取锁的客户端,然后实际情况是只会有一个客户端获得锁。如果在集群规模比较大的情况下,会 对zookeeper服务器的性能产生比较的影响。

      第二种:利用临时有序节点实现分布式锁

      每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么可以认为子节点中最小的节点设置为获得锁。如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。这种实现和前面单节点实现的差异性在于,每个节点只需要监听比自己小的节点,当比自己小的节点删除以后,客户端会收到watcher事件,此时再次判断自己的节点是不是所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致惊群效应,因为每个客户端只需要监控一个节点。 

    4.2、curator分布式锁的基本使用

      curator对于锁这块做了一些封装,提供了 InterProcessMutex 这样一个api。除了分布式锁之外,还提供了leader选举、分布式队列等常用的功能。

        InterProcessMutex:分布式可重入排它锁

        InterProcessSemaphoreMutex:分布式排它锁

        InterProcessReadWriteLock:分布式读写锁 

    public class Demo { 
     
        public static void main(String[] args) {        
             CuratorFramework curatorFramework=null;         
             curatorFramework=CuratorFrameworkF actory.builder().                 
             connectString(ZkConfig.ZK_CONNECT_ STR).                 
             sessionTimeoutMs(ZkConfig.ZK_SESSI ON_TIMEOUT).                 
             retryPolicy(new ExponentialBackoffRetry(1000,10)). build();         
             curatorFramework.start(); 
     
             final InterProcessMutex lock=new InterProcessMutex(curatorFramework ,"/locks");
             for(int i=0;i<10;i++){             
                 new Thread(()->{                 
                   System.out.println(Thread.currentT hread().getName()+"->尝试获取锁");                 
                   try {                     
                      lock.acquire();                     
                      System.out.println(Thread.currentT hread().getName()+"->获得锁成功");                 
                    } catch (Exception e) {                     
                       e.printStackTrace();                 
                    }                 
                    try {                     
                       Thread.sleep(4000);                    
                       lock.release();                     
                       System.out.println(Thread.currentT hread().getName()+"->释放锁成功"); 
                    } catch (Exception e) {                     e.printStackTrace();                 } 
     
     
                },"t"+i).start();         
    } } }

    5、curator实现leader选举

      curator有两种选举策略:

      第一种:Leader Latch——与分布式锁的实现一样

      参与选举的所有节点,会创建一个顺序节点,其中最小的节点会设置为master节点, 没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢 主,当master节点手动调用close()方法或者master 节点挂了之后,后续的子节点会抢占master。 

      第二种:LeaderSelector

      LeaderSelector和Leader Latch最的差别在于,leader 在释放领导权以后,还可以继续参与竞争。

      案例演示:

    public class SelectorClient2 extends 
    LeaderSelectorListenerAdapter implements Closeable 
    { 
        private final String name; 
        private final LeaderSelector leaderSelector; 
     
        public SelectorClient2(CuratorFramework client, String path, String name) { 
            this.name = name; 
            // 利用一个给定的路径创建一个 leader selector 
            // 执行 leader 选举的所有参与者对应的路径必须一样 
            // 本例中 SelectorClient 也是一个LeaderSelectorListener ,但这不是必须的。 
            leaderSelector = new LeaderSelector(client, path, this); 
            // 在大多数情况下,我们会希望一个 selector放弃 leader 后还要重新参与 leader 选举 
            leaderSelector.autoRequeue(); 
        } 
     
        public void start(){ 
            leaderSelector.start(); 
        } 
     
        @Override 
        public void close() throws IOException { 
            leaderSelector.close(); 
        } 
     
        @Override 
        public void takeLeadership(CuratorFramework curatorFramework) throws Exception { 
            System. out .println(name + " 现在是 leader了,持续成为 leader "); 
            // 选举为 master , 
            System. in .read();//阻塞,让当前获得 leader权限的节点一直持有,直到该进程关闭 
        } 
        private static String CONNECTION_STR ="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181"; 
     
        public static void main(String[] args) throws IOException { 
            CuratorFramework curatorFramework= CuratorFrameworkFactory. builder (). 
                    connectString( CONNECTION_STR ).sessionTimeoutMs(5000). 
                    retryPolicy(new ExponentialBackoffRetry(1000,3)).build(); 
            curatorFramework.start(); 
            SelectorClient2 sc=new SelectorClient2(curatorFramework,"/leader","ClientB"); 
            sc.start(); 
            System. in .read(); 
        }

    EventType.None

    zookeeper客户端的连接状态发生变更时,该事件被触发

  • 相关阅读:
    linux 中rz sz 文件传输
    linux find 命令
    深度学习的博客
    cifar10数据的转换、代码解释
    gflags的使用实例(转载)
    leveldb使用 (转载)
    (转载+整理)Leveldb安装及例子
    2013-09-25-【随笔】-Roy
    2013-09-22 [随笔]-Roy
    2013-08-12【随笔2】-Roy
  • 原文地址:https://www.cnblogs.com/jing-yi/p/12989655.html
Copyright © 2011-2022 走看看