zoukankan      html  css  js  c++  java
  • ZooKeeper服务-操作(API、集合更新、观察者、ACL)

    操作

    create:创建一个znode(必须要有父节点)
    delete:删除一个znode(该znode不能有任何子节点)
    exists:测试一个znode是否存在并且查询它的元数据
    getACL,setACL:获取/设置一个znode的ACL
    getChildren:获取一个znode的子节点列表
    getData,setData:获取/设置一个znode所保存的数据
    sync:将客户端的znode视图与ZooKeeper同步

    ZooKeeper中的更新操作是有条件的。在使用delete或setData操作时必须提供被更新znode的版本号(可以通过exists操作获得)。如果版本号不匹配,则更新操作会失败。更新操作是非阻塞操作,因此一个更新失败的客户端(由于其他进程同时在更新同一个znode)可以决定是否重试,或执行其他操作,并不会因此而阻塞其他进程的执行。
    虽然ZooKeeper可以被看作是一个文件系统,但出于简单性的需要,有一些文件系统的基本操作被它摒弃了。由于ZooKeeper中的文件较小并且总是被整体读写,因此没有必要提供打开、关闭或查找操作。
    Sync操作与POSIX文件系统中的fsync()操作是不同的。如前所述,ZooKeeper中的写操作具有原子性,一个成功的写操作会保证将数据写到ZooKeeper服务器的持久存储介质中。然而,ZooKeeper允许客户端读到的数据滞后于ZooKeeper服务的最新状态,因此客户端可以使用sync操作来获取数据的最新状态。

    1. 集合更新(Multiupdate)

    ZooKeeper中有一个被称为multi的操作,用于将多个基本操作集合成一个操作单元,并确保这些基本操作同时被成功执行,或者同时失败,不会发生其中部分基本操作被成功执行而其他基本操作失败的情况。
    集合更新可以被用于在ZooKeeper中构建需要保持全局一致性的数据结构,例如构建一个无向图。在ZooKeeper中用一个znode来表示无向图中的一个顶点,为了在两个顶点之间添加或删除一条边,我们需要同时更新两个顶点所分别对应的两个znode,因为每个znode中都有指向对方的引用。如果我们只用ZooKeeper的基本操作来实现边的更新,可能会让其他客户端发现无向图处于不一致的状态,即一个顶点具有指向另一个顶点的引用而对方却没有对应的应用。将针对两个znode的更新操作集合到一个multi操作中可以保证这组更新操作的原子性,也就保证了一对顶点之间不会出现不完整的连接。

    2. 关于API

    对于ZooKeeper客户端来说,主要有两种语言绑定(binding)可以使用:Java和C;当然也可以使用Perl、Python和REST的contrib绑定。对于每一种绑定语言来说,在执行操作时都可以选择同步执行或异步执行。看exists的签名
    public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
    它返回一个封装有znode元数据的Stat对象(如果znode不存在,则返回null)
    异步执行的签名如下

    public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)

    因为所有操作的结果都是通过回调来传送的,因此在Java API中异步方法的放回类型都是void。调用者传递一个回调的实现,当ZooKeeper相应时,该回调方法被调用。这种情况下,回调采用StatCallback接口,它有以下方法:

    /**
    * 回调
    * @param rc	返回代码,对应于KeeperException的代码。每个非零代码都代表一个异常
    * @param path	对应于客户端传递给exists方法的参数,用于识别这个回调所相应的请求。当path参数不能提供足够的信息时,客户端可以通过ctx参数来区分不同请求。如果path参数提供了足够的信息,ctx可以设为null
    * @param ctx	对应于客户端传递给exists方法的参数,用于识别这个回调所相应的请求。可以是任意对象
    * @param stat	这种情况下,stat参数是null
    */
    public void processResult(int rc, String path, Object ctx, Stat stat) {
    System.out.println("rc:" + rc);
    System.out.println("path:" + path);
    System.out.println("ctx:" + ctx);
    }

    异步API允许以流水线方式处理请求,这在某些情况下可以提供更好的吞吐量。

    3. 观察触发器

    在exists、getChildren和getData这些读操作上可以设置观察,这些观察可以被写操作create、delete和setData触发。ACL相关的操作不参与触发任何观察。当一个观察被触发时会产生一个观察事件,这个观察和触发它的操作共同决定着观察事件的类型。
    当做观察的znode被创建、删除或其数据被更新时,设置在exists操作上的观察将被触发。
    当所观察的znode被删除或其数据被更新时,设置在getData操作上的观察将被触发。创建znode不会触发getData操作上的观察,因为getData操作成功执行的前提是znode必须已经存在。
    当所观察的znode的一个子节点被创建或删除时,或所观察的znode自己被删除时,设置在getChildren操作上的观察将会被触发。可以通过观察时间的类型来判断被删除的是znode还是其子节点:NodeDelete类型代表znode被删除;NodeChildrenChanged类型代表一个子节点被删除。

    一个观察事件中包含涉及该事件的znode的路径,因此对于NodeCreated和NodeDeleted事件来说,可通过路径来判断哪一个节点被创建或删除。为了能够在NodeChildrenChanged事件发生之后判断是哪些子节点被修改,需要重新调用getChildren来获取新的子节点列表。与之类似,为了能够在NodeDataChanged事件之后获取新的数据,需要调用getData。

    测试

    package com.zhen.zookeeper.existsAndWatcher;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.AsyncCallback.StatCallback;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * @author FengZhen
     * @date 2018年10月13日
     * exists与观察者
     *	state=-112 会话超时状态
    	state= -113 认证失败状态
    	state=  1 连接建立中
    	state= 2 (暂时不清楚如何理解这个状态,ZOO_ASSOCIATING_STATE)
    	state=3 连接已建立状态
    	state= 999 无连接状态
    	
    	
    	type=1 创建节点事件
    	type=2 删除节点事件
    	type=3 更改节点事件
    	type=4 子节点列表变化事件
    	type= -1 会话session事件
    	type=-2 监控被移除事件
    
     */
    public class ExistsAndWatcher implements Watcher{
    
    	private static final int SESSION_TIMEOUT = 5000;
    	
    	private ZooKeeper zk;
    	private CountDownLatch connectedSignal = new CountDownLatch(1);
    	
    	public void connect(String hosts) throws IOException, InterruptedException {
    		/**
    		 * hosts:ZooKeeper服务的主机地址(可指定端口,默认是2181)
    		 * SESSION_TIMEOUT:以毫秒为单位的会话超时参数(此处为5秒)
    		 * this:Watcher对象的实例。Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知。
    		 */
    		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    		connectedSignal.await();
    	}
    	
    	/**
    	 * 当客户端已经与ZK建立连接后,Watcher的process方法会被调用
    	 * 参数是一个用于表示该连接的事件。
    	 */
    	public void process(WatchedEvent event) {
    		int type = event.getType().getIntValue();
    		System.out.println("watchedEven--" + event.getState().getIntValue() + " : " + type);
    		//连接事件
    		if (event.getState() == KeeperState.SyncConnected) {
    			/**
    			 * 通过调用CountDownLatch的countDown方法来递减它的计数器。
    			 * 锁存器(latch)被创建时带有一个值为1的计数器,用于表示在它释放所有等待线程之前需要发生的事件数。
    			 * 在调用一次countDown方法之后,计数器的值变为0,则await方法返回。
    			 */
    			connectedSignal.countDown();
    		}
    		//如果为创建或者删除znode的话,需要再添加一个观察者,观察后续操作
    		if (type == 1 || type == 2) {
    			existsAndWatcher("/zoo");
    		}
    	}
    
    	public void create(String groupName) throws KeeperException, InterruptedException {
    		String path = "/" + groupName;
    		/**
    		 * 用ZK的create方法创建一个新的ZK的znode
    		 * path:路径(用字符串表示)
    		 * null:znode的内容(字节数组,此处为空值)
    		 * Ids.OPEN_ACL_UNSAFE:访问控制列表(简称ACL,此处为完全开放的ACL,允许任何客户端对znode进行读写)
    		 * CreateMode.PERSISTENT:znode类型
    		 * znode类型可以分为两种:1.短暂的(ephemeral)	2.持久的(persistent)
    		 * 创建znode的客户端断开连接时,无论客户端是明确断开还是因为任何原因而终止,短暂znode都会被ZK服务删除。持久znode不会被删除。
    		 * create方法的返回值是ZK所创建的节点路径
    		 */
    		String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    		System.out.println("Created " + createdPath);
    	}
    	
    	public void close() throws InterruptedException {
    		zk.close();
    	}
    	
    	public boolean exists(String path) throws KeeperException, InterruptedException {
    		Stat exists = zk.exists(path, true);
    		return null != exists;
    	}
    	
    	public void existsAndWatcher(String path) {
    		zk.exists(path, this, new StatCallback() {
    			/**
    			 * 回调
    			 * @param rc	返回代码,对应于KeeperException的代码。每个非零代码都代表一个异常
    			 * @param path	对应于客户端传递给exists方法的参数,用于识别这个回调所相应的请求。当path参数不能提供足够的信息时,客户端可以通过ctx参数来区分不同请求。如果path参数提供了足够的信息,ctx可以设为null
    			 * @param ctx	对应于客户端传递给exists方法的参数,用于识别这个回调所相应的请求。可以是任意对象
    			 * @param stat	这种情况下,stat参数是null
    			 */
    			public void processResult(int rc, String path, Object ctx, Stat stat) {
    				System.out.println("rc:" + rc);
    				System.out.println("path:" + path);
    				System.out.println("ctx:" + ctx);
    			}
    		}, "标记回调所相应的请求");
    	}
    	
    	public void delete(String groupName) {
    		String path = "/" + groupName;
    		try {
    			List<String> children = zk.getChildren(path, false);
    			for (String child : children) {
    				zk.delete(path + "/" + child, -1);
    			}
    			/**
    			 * delete方法有两个参数
    			 * path:节点路径
    			 * -1:版本号
    			 * 如果所提供的版本号与znode的版本号一致,ZK会删除这个znode。
    			 * 这是一种乐观的加锁机制,使客户端能够检测出对znode的修改冲突。
    			 * 通过将版本号设置为-1,可以绕过这个版本检测机制,不管znode的版本号是什么而直接将其删除。
    			 * ZK不支持递归删除,因此在删除父节点之前必须先删除子节点
    			 */
    			zk.delete(path, -1);
    		} catch (KeeperException e) {
    			System.out.printf("Group %s does not exist
    ", groupName);
    			System.exit(1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
    		String hosts = "localhost:2181";
    		String groupName = "zoo";
    		ExistsAndWatcher existsAndWatcher = new ExistsAndWatcher();
    		existsAndWatcher.connect(hosts);
    		//同步
    		boolean exists = existsAndWatcher.exists("/zoo");
    		System.out.println("exists zoo:" + exists);
    		//异步
    		existsAndWatcher.existsAndWatcher("/zoo");
    		existsAndWatcher.create(groupName);
    		existsAndWatcher.delete(groupName);
    	
    		existsAndWatcher.close();
    	}
    	
    }
    

      

    4. ACL列表

    每个znode被创建时都会带有一个ACL列表,用于决定谁可以对它执行何种操作。
    ACL依赖于ZooKeeper的客户端身份验证机制。ZooKeeper提供了以下几种身份验证方式
     Digest:通过用户名和密码来识别客户端
     Sasl:通过Kerberos来识别客户端
     Ip:通过客户端的IP地址来识别客户端

    在建立一个ZooKeeper会话之后,客户端可以对自己进行身份验证。虽然znode的ACL列表会要求所有的客户端是经过验证的,但ZooKeeper的身份验证过程却是可选的,客户端必须自己进行身份验证来支持对znode的访问。
    使用digest方式进行身份验证的例子
    zk.addAuthInfo("digest", "fz:secret".getBytes());
    每个ACL都是身份验证方式、符合该方式的一个身份和一组权限的组合。例如,如果打算给IP地址为10.0.0.1的客户端对某个znode的读权限,可以使用IP验证方式、10.0.0.1和READ权限在该znode上设置一个ACL。

    测试

    package com.zhen.zookeeper.ACL;
    
    import java.io.IOException;
    import java.security.NoSuchAlgorithmException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Perms;
    
    
    /**
     * @author FengZhen
     * @date 2018年11月25日
     * ACL
     */
    public class ACLTest implements Watcher{
    
    private static final int SESSION_TIMEOUT = 5000;
    	
    	private ZooKeeper zk;
    	private CountDownLatch connectedSignal = new CountDownLatch(1);
    	
    	public void connect(String hosts) throws IOException, InterruptedException {
    		/**
    		 * hosts:ZooKeeper服务的主机地址(可指定端口,默认是2181)
    		 * SESSION_TIMEOUT:以毫秒为单位的会话超时参数(此处为5秒)
    		 * this:Watcher对象的实例。Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知。
    		 */
    		zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    		//根据IP
    		zk.addAuthInfo("ip", "192.168.1.103".getBytes());
    		//根据用户密码
    		zk.addAuthInfo("digest", "fz:123456".getBytes());
    		connectedSignal.await();
    	}
    	
    	/**
    	 * 当客户端已经与ZK建立连接后,Watcher的process方法会被调用
    	 * 参数是一个用于表示该连接的事件。
    	 */
    	public void process(WatchedEvent event) {
    		//连接事件
    		if (event.getState() == KeeperState.SyncConnected) {
    			/**
    			 * 通过调用CountDownLatch的countDown方法来递减它的计数器。
    			 * 锁存器(latch)被创建时带有一个值为1的计数器,用于表示在它释放所有等待线程之前需要发生的事件数。
    			 * 在调用一次countDown方法之后,计数器的值变为0,则await方法返回。
    			 */
    			connectedSignal.countDown();
    		}
    	}
    
    	public void createACLIP(String groupName) throws KeeperException, InterruptedException {
    		String path = "/" + groupName;
    		/**
    		 * 用ZK的create方法创建一个新的ZK的znode
    		 * path:路径(用字符串表示)
    		 * null:znode的内容(字节数组,此处为空值)
    		 * Ids.OPEN_ACL_UNSAFE:访问控制列表(简称ACL,此处为完全开放的ACL,允许任何客户端对znode进行读写)
    		 * CreateMode.PERSISTENT:znode类型
    		 * znode类型可以分为两种:1.短暂的(ephemeral)	2.持久的(persistent)
    		 * 创建znode的客户端断开连接时,无论客户端是明确断开还是因为任何原因而终止,短暂znode都会被ZK服务删除。持久znode不会被删除。
    		 * create方法的返回值是ZK所创建的节点路径
    		 */
    		//添加权限,设置IP
    		ACL aclIP = new ACL(Perms.ALL, new Id("ip",  "192.168.1.103"));
    		System.out.println(aclIP);
    		List<ACL> acls = new ArrayList<ACL>();
    		acls.add(aclIP);
    		String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT);
    		System.out.println("Created " + createdPath);
    	}
    	
    	public void createACLDigest(String groupName) throws KeeperException, InterruptedException, NoSuchAlgorithmException {
    		String path = "/" + groupName;
    		/**
    		 * 用ZK的create方法创建一个新的ZK的znode
    		 * path:路径(用字符串表示)
    		 * null:znode的内容(字节数组,此处为空值)
    		 * Ids.OPEN_ACL_UNSAFE:访问控制列表(简称ACL,此处为完全开放的ACL,允许任何客户端对znode进行读写)
    		 * CreateMode.PERSISTENT:znode类型
    		 * znode类型可以分为两种:1.短暂的(ephemeral)	2.持久的(persistent)
    		 * 创建znode的客户端断开连接时,无论客户端是明确断开还是因为任何原因而终止,短暂znode都会被ZK服务删除。持久znode不会被删除。
    		 * create方法的返回值是ZK所创建的节点路径
    		 */
    		//添加权限,设置IP
    		ACL aclIP = new ACL(Perms.ALL, new Id("digest", DigestAuthenticationProvider.generateDigest("fz:123456")));
    		System.out.println(aclIP);
    		List<ACL> acls = new ArrayList<ACL>();
    		acls.add(aclIP);
    		String createdPath = zk.create(path, null, acls, CreateMode.PERSISTENT);
    		System.out.println("Created " + createdPath);
    	}
    	
    	public void close() throws InterruptedException {
    		zk.close();
    	}
    	
    	public void writeZnodeACLDigest(String groupName) throws KeeperException, InterruptedException {
    		String path = "/" + groupName;
    		zk.setData(path, "test_digest_data".getBytes(), 0);
    	}
    	
    	public void writeZnodeACLIP(String groupName) throws KeeperException, InterruptedException {
    		String path = "/" + groupName;
    		zk.setData(path, "test_ip_data".getBytes(), 0);
    	}
    	
    	public void readZnode(String groupName) throws KeeperException, InterruptedException {
    		String path = "/" + groupName;
    		String data = zk.getData(path, false, null).toString();
    		System.out.println("data = " + data);
    	}
    	
    	public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
    		String hosts = "192.168.1.103:2181";
    		String groupNameIP = "znode_acl_test_ip";
    		String groupNameDigest = "znode_acl_test_digest";
    		ACLTest aclTest = new ACLTest();
    		aclTest.connect(hosts);
    		//digest
    //		aclTest.createACLDigest(groupNameDigest);
    //		aclTest.writeZnodeACLDigest(groupNameDigest);
    		aclTest.readZnode(groupNameDigest);
    		
    		//IP
    //		aclTest.createACLIP(groupNameIP);
    //		aclTest.writeZnodeACLIP(groupNameIP);
    //		aclTest.readZnode(groupNameIP);
    		
    		aclTest.close();
    		
    		//创建完带有ACL的znode之后,查看ACL
    		//[zk: localhost:2181(CONNECTED) 7] getAcl /znode_acl_test_ip
    		//'ip,'192.168.1.103
    		//: cdrwa
    		
    		//不设置IP直接读取该znode内容,报错如下
    		//Exception in thread "main" org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /znode_acl_test_ip
    		
    	}
    	
    	
    }
    

      

  • 相关阅读:
    NX二次开发-UFUN获取当前主要版本的版本号
    NX二次开发-UFUN创建图纸注释uc5540
    VC++ADO/COM,put_LeftMargin设置纸张上下左右边距的大小
    VC++ADO/COM,put_Orientation组件设置纸张方向0默认1纵向2横向
    常用的数据拼接方法(不断更新中)
    去重对象数组
    将对象数组中指定键值赋给另一个数组并对更改对象数组中的key
    CSS3动态计算公式——calc()的坑
    @PostConstruct注解
    java操作Redis缓存设置过期时间
  • 原文地址:https://www.cnblogs.com/EnzoDin/p/10016614.html
Copyright © 2011-2022 走看看