zoukankan      html  css  js  c++  java
  • zookeeper

    ZKUtils.java

    package test;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    public class ZKUtils {
    	public static ZooKeeper openZk() throws IOException, InterruptedException {
    		//连接zookeeper成功的标志
    		final CountDownLatch connectedSignal = new CountDownLatch(1);
    		
    		ZooKeeper zk = new ZooKeeper(Test.connectString, Test.sessionTimeout, new Watcher() {
    			@Override
    			public void process(WatchedEvent event) {
    				if(KeeperState.SyncConnected.equals(event.getState())) {
    					//连接成功则打开当前进程
    					connectedSignal.countDown();
    				}
    			}
    		});
    		
    		//对CountDownLatch对象调用await()方法后,当前线程会堵塞等待,直到对象的计数器为0(调用对象的countDown()方法减1)
    		//堵塞当前进程
    		connectedSignal.await();
    		return zk;
    	}
    }
    

      

    Test.java

    package test;
    import java.io.IOException;
    import java.security.NoSuchAlgorithmException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import org.apache.zookeeper.AsyncCallback.VoidCallback;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooDefs.Perms;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Id;
    import org.apache.zookeeper.data.Stat;
    import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
    public class Test {
    	public static final String connectString = "hadoop1:2181";
    	public static final int sessionTimeout = 5000;
    	public static void main(String[] args) throws Exception {
    		//创建path
    //		createZnode("/b");
    		
    		//列出子znode
    //		listChildren("/");
    		
    //		delete("/b");
    		
    //		deleteAsynchronous("/b");
    //		watch();
    		
    		//强制客户端连接的服务区跟领导者进行同步,以更新指定path的状态,只能是异步调用
    		sync();
    		
    		/*
    		 * 验证
    		 */
    //		auth1();
    //		auth2();
    	}
    	/**
    	 * 创建znode
    	 * @param path
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void createZnode(String path) throws IOException, InterruptedException,
    			KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    		System.out.println("创建" + createdPath + "成功");
    		
    		// 创建znode, 数据是hello, ACL(访问控制列表)是完全放开的列表, 短暂类型的znode(session断开后,znode将被zookeeper服务器删除)
    //		String createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    		// 这里打印的是
    //		System.out.println(createdPath);
    		// 顺序的短暂znode
    //		createdPath = zk.create(path, "hello".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    		// 这里打印的是path000000000N, 路径名+10位的序列号
    //		System.out.println(createdPath);
    	}
    	
    	/**
    	 * 列出子znode
    	 * @param parent
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void listChildren(String parent) throws IOException, InterruptedException, 
    			KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		
    		Stat state = zk.exists(parent, false);
    		if(state == null) {
    			return;
    		}
    		
    		List<String> children = zk.getChildren(parent, false);
    		for (String child : children) {
    			System.out.println(child);
    		}
    	}
    	
    	/**
    	 * 删除znode及其子znode
    	 * @param path
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void delete(String path) throws IOException, InterruptedException, KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		
    		Stat state = zk.exists(path, false);
    		if(state == null) {
    			return;
    		}
    		
    		System.out.println(state.getVersion());
    		
    		List<String> children = zk.getChildren(path, false);
    		for (String child : children) {
    			delete(path + "/" + child);
    		}
    		
    		//需要指定path和version, version为-1则取消版本号验证
    		zk.delete(path, -1);
    	}
    	
    	/**
    	 * 异步操作
    	 * zookeeper同时提供同步、异步两个版本的API,业务对读取效率没影响的情况下选择哪个方式都可以.
    	 * @param path
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void deleteAsynchronous(String path) throws IOException, InterruptedException, KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		
    		Stat state = zk.exists(path, false);
    		if(state == null) {
    			return;
    		}
    		
    		List<String> children = zk.getChildren(path, false);
    		for (String child : children) {
    			delete(path + "/" + child);
    		}
    		
    		//需要指定path和version, version为-1则取消版本号验证
    //		zk.delete(path, -1);
    		zk.delete(path, -1, new VoidCallback() {
    			@Override
    			public void processResult(int rc, String path, Object ctx) {
    				System.out.println("异步删除操作执行完毕 , rc: " + rc + ", path: " + path);
    			}
    		}, null);
    		
    		//等待一会, 否则主线程直接结束了就看不到异步线程的输出结果了
    		Thread.sleep(2000);
    	}
    	
    	/**
    	 * 监听事件
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void watch() throws IOException, InterruptedException, KeeperException {
    		final CountDownLatch singal = new CountDownLatch(1);
    		ZooKeeper zk = new ZooKeeper("hadoop1:2181", 5000, new Watcher() {
    			@Override
    			public void process(WatchedEvent event) {
    				System.out.println("接受到了一个事件:" + event);
    				if(Watcher.Event.KeeperState.SyncConnected.equals(event.getState())) {
    					singal.countDown();
    				}
    			}
    		});
    		singal.await();
    		//exists、getData、getChildren操作可以设置监控
    		
    		//判断"/b"是否存在并对其进行监控, 使用创建zookeeper时的watcher处理
    		zk.exists("/b", true);
    		
    		//使用指定的watcher处理
    //		zk.exists("/b", new Watcher());
    		
    		Thread.sleep(Long.MAX_VALUE);
    		
    	}
    	
    	/**
    	 * 强制客户端连接的服务器跟领导者进行同步,以更新指定znode的状态
    	 * 只能异步调用
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void sync() throws IOException, InterruptedException, KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		zk.sync("/a", new VoidCallback(){
    			@Override
    			public void processResult(int rc, String path, Object ctx) {
    				try {
    					Thread.sleep(3000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				System.out.println("同步完毕");
    			}
    			
    		}, null);
    		
    		System.out.println("here");
    		
    		Stat stat = zk.exists("/a", false);
    		byte[] data = zk.getData("/a", false, stat);
    		System.out.println("data: " + new String(data));
    		
    		Thread.sleep(5000);
    	}
    	
    	
    	
    	/**
    	 * 使用自定义ACL创建znode
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 * @throws NoSuchAlgorithmException
    	 */
    	private static void auth1() throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
    		
    		List<ACL> acls = new ArrayList<ACL>();
    		//用户名密码验证方式
    		Id id = new Id("digest", DigestAuthenticationProvider.generateDigest("lisg:123456"));
    		acls.add(new ACL(Perms.READ, id));
    		
    		ZooKeeper zk = ZKUtils.openZk();
    		zk.create("/c", "test".getBytes(), acls, CreateMode.PERSISTENT);
    	}
    	
    	/**
    	 * ACL验证
    	 * @throws IOException
    	 * @throws InterruptedException
    	 * @throws KeeperException
    	 */
    	private static void auth2() throws IOException, InterruptedException, KeeperException {
    		ZooKeeper zk = ZKUtils.openZk();
    		
    		Stat stat = zk.exists("/c", false);
    		if(stat == null) {
    			return;
    		}
    		
    		List<ACL> cacls = zk.getACL("/c", stat);
    		System.out.println("/c的ACL列表是:" + cacls);
    		
    		//KeeperErrorCode = NoAuth for /c 异常
    		zk.addAuthInfo("digest", "lisg:123456".getBytes());
    		byte[] data = zk.getData("/c", false, stat);
    		System.out.println("data: " + new String(data));
    	}
    }
    

      

  • 相关阅读:
    @SerializedName注解
    信号量与互斥锁
    MySQL分配角色权限
    PO、VO、DAO、BO、POJO
    Java工程为什么要加一个biz层
    Java学习之强引用,弱引用,软引用 与 JVM
    深入Activity,Activity启动模式LaunchMode完全解析
    活体检测
    遥感图像数据集
    NNIE(待尝试)
  • 原文地址:https://www.cnblogs.com/lishouguang/p/4558992.html
Copyright © 2011-2022 走看看