zoukankan      html  css  js  c++  java
  • zookeeper(3) zookeeper的实践及原理

    一、基于java API初探zookeeper的使用

       (1)建立连接

      

    public static void main(String[] args) {
    		
    		//NOT_CONNECTED-->CONNECTING-->CONNECTED-->CLOSE 连接的状态
    		try {
    			final CountDownLatch countDownLatch = new CountDownLatch(1); 
    			ZooKeeper zooKeeper = new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, new Watcher() {
    				@Override
    				public void process(WatchedEvent event) {
    					// TODO Auto-generated method stub
    					if(Event.KeeperState.SyncConnected == event.getState()){
    						//如果接收到服务端响应事件,连接成功
    						countDownLatch.countDown();
    					}
    				}
    			});
    			countDownLatch.await();
    			System.out.println(zooKeeper.getState());//CONNECTING
    			//Thread.sleep(1000);
    			//System.out.println(zooKeeper.getState());//CONNECTED
    			zooKeeper.close();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    

      (2)节点数据的增删改查

    public static void main(String[] args) {
    		
    		//NOT_CONNECTED-->CONNECTING-->CONNECTED-->CLOSE 连接的状态
    		try {
    			final CountDownLatch countDownLatch = new CountDownLatch(1); 
    			ZooKeeper zooKeeper = new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, new Watcher() {
    				@Override
    				public void process(WatchedEvent event) {
    					// TODO Auto-generated method stub
    					if(Event.KeeperState.SyncConnected == event.getState()){
    						//如果接收到服务端响应事件,连接成功
    						countDownLatch.countDown();
    					}
    				}
    			});
    			countDownLatch.await();
    			System.out.println(zooKeeper.getState());//CONNECTING
    			//Thread.sleep(1000);
    			//System.out.println(zooKeeper.getState());//CONNECTED
    			
    			//添加节点
    			zooKeeper.create("/lf00","123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    			Thread.sleep(1000);
    			Stat stat = new Stat();
    			//得到当前节点的值
    			byte[] data = zooKeeper.getData("/lf00", null, stat);
    			System.out.println(data.toString()+"---"+stat.getVersion());
    			//修改当前节点的值
    			zooKeeper.setData("/lf00", "124".getBytes(), stat.getVersion());
    			//得到当前节点的值
    			byte[] data2 = zooKeeper.getData("/lf00", null, stat);
    			System.out.println(data2.toString()+"---"+stat.getVersion());
    			//删除节点
    			zooKeeper.delete("/lf00", stat.getVersion());
    			zooKeeper.close();
    			System.in.read();//阻塞进程
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    

      

     (3)事件特性

      Watcher特性:当数据发生变化的时候,zookeeper会产生一个watcher事件,并且会发送到客户端。但是客户端是会收到一次通知。如果后续这个节点再次发生变化,那么之前设置watcher的客户端不会再次收到通知(watcher是一次性的操作),可以通过循环监听达到永久监听的效果。

      如何注册事件机制:getDate、Exists、getChildren

      

    public static void main(String[] args) {
    			try {
    				final CountDownLatch countDownLatch = new CountDownLatch(1); 
    				ZooKeeper zooKeeper = new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, new Watcher() {
    					@Override
    					public void process(WatchedEvent event) {
    						System.out.println("全局默认事件:"+event.getType()+"->"+event.getPath());
    						// TODO Auto-generated method stub
    						if(Event.KeeperState.SyncConnected == event.getState()){
    							//如果接收到服务端响应事件,连接成功
    							countDownLatch.countDown();
    						}
    					}
    				});
    				countDownLatch.await();
    				System.out.println(zooKeeper.getState());//CONNECTING
    				zooKeeper.create("/lf", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    				//通过Exists、getData、getChildren绑定watcher事件
    				//Stat stat = zooKeeper.exists("/lf", true);
    				Stat stat = zooKeeper.exists("/lf", new Watcher() {
    					@Override
    					public void process(WatchedEvent event) {
    						// TODO Auto-generated method stub
    						System.out.println(event.getType()+"->"+event.getPath());
    					}
    				});
    				//通过修改事务操作来触发监听
    				stat = zooKeeper.setData("/lf", "222".getBytes(), stat.getVersion());
    				Thread.sleep(1000);
    				zooKeeper.delete("/lf", stat.getVersion());
    			} catch (Exception e) {
    				// TODO: handle exception
    				e.printStackTrace();
    			}
    		}
    运行结果:
    全局默认事件:None
    CONNECTED
    NodeDataChanged->/lf  只会有一次事务监听,删除节点的事务监听没通知
    

      循环监听:

      

    public static void main(String[] args) {
    			try {
    				final CountDownLatch countDownLatch = new CountDownLatch(1); 
    				ZooKeeper zooKeeper = new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, new Watcher() {
    					@Override
    					public void process(WatchedEvent event) {
    						System.out.println("全局默认事件:"+event.getType()+"->"+event.getPath());
    						// TODO Auto-generated method stub
    						if(Event.KeeperState.SyncConnected == event.getState()){
    							//如果接收到服务端响应事件,连接成功
    							countDownLatch.countDown();
    						}
    					}
    				});
    				countDownLatch.await();
    				System.out.println(zooKeeper.getState());//CONNECTING
    				zooKeeper.create("/lf", "111".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    				//通过Exists、getData、getChildren绑定watcher事件
    				//Stat stat = zooKeeper.exists("/lf", true);
    				Stat stat = zooKeeper.exists("/lf", new Watcher() {
    					@Override
    					public void process(WatchedEvent event) {
    						// TODO Auto-generated method stub
    						System.out.println(event.getType()+"->"+event.getPath());
    						try {
    							//zooKeeper.exists("/lf", true);
    							 zooKeeper.exists("/lf", new Watcher() {
    								
    								@Override
    								public void process(WatchedEvent event) {
    									// TODO Auto-generated method stub
    									System.out.println(event.getType()+"->"+event.getPath());
    								}
    							});
    						} catch (Exception e) {
    							e.printStackTrace();
    						}
    					}
    				});
    				//通过修改事务操作来触发监听
    				stat = zooKeeper.setData("/lf", "222".getBytes(), stat.getVersion());
    				//再通过修改事务操作来触发监听,发现还是两次,还得添加在上面嵌套//zooKeeper.exists("/lf", true);监听
    				//stat = zooKeeper.setData("/lf", "322".getBytes(), stat.getVersion());
    				Thread.sleep(1000);
    				zooKeeper.delete("/lf", stat.getVersion());
    				System.in.read();
    			} catch (Exception e) {
    				// TODO: handle exception
    				e.printStackTrace();
    			}
    		}
    结果;
    全局默认事件:None->null
    CONNECTED
    NodeDataChanged->/lf
    NodeDeleted->/lf
    放开标记绿色部分,运行结果:

    全局默认事件:None->null
    CONNECTED
    NodeDataChanged->/lf
    NodeDataChanged->/lf 再通过修改事务操作来触发监听,发现还是两次,还得添加在上面嵌套//zooKeeper.exists("/lf", true);监听

      Watcher事件类型:

             None (-1), // 客户端连接状态发生变化的时候 会受到none事件
             NodeCreated (1), // 节点创建事件
             NodeDeleted (2), // 节点删除事件
             NodeDataChanged (3), // 节点数据变化
             NodeChildrenChanged (4); // 子节点被创建 删除触发该事件
    

      什么样的操作会产生什么样的事件的?

      

    Watcher事件机制原理:

    client 端连接后会注册一个事件,然后客户端会保存这个事件,通过zkWatcherManager 保存客户端的事件注册,通知服务端 Watcher 为 true,然后服务端会通过WahcerManager 会绑定path对应的事件。如下图:

     Curator的使用

      

    public static void main(String[] args) throws Exception {
    		CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
    				.connectString("192.168.25.129:2181," + "192.168.25.130:2181," + "192.168.25.131:2181")
    				.sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace("curator").build();
    
    		curatorFramework.start();
    		//创建节点
    		curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/lf001/node002",
    				"111".getBytes());
    		
    		Stat stat = new Stat();
    		byte[] path = curatorFramework.getData().storingStatIn(stat).forPath("/lf001/node002");
    		String value = new String(path);
    		System.out.println("创建节点"+value);
    		stat = curatorFramework.setData().withVersion(stat.getVersion()).forPath("/lf001/node002", "222".getBytes());
    		byte[] path2 = curatorFramework.getData().storingStatIn(stat).forPath("/lf001/node002");
    		String value2 = new String(path2);
    		System.out.println("创建节点"+value2);
    		curatorFramework.delete().deletingChildrenIfNeeded().forPath("/curator");
    		System.out.println("执行结束!");
    		curatorFramework.close();
    
    	}
    

      Curator的Watcher机制

    package com.lf.zookeeper;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.NodeCache;
    import org.apache.curator.framework.recipes.cache.NodeCacheListener;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.framework.recipes.cache.TreeCache;
    import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
    import org.apache.curator.framework.recipes.cache.TreeCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    public class CuratorWatcherDemo {
    	
    	public static void main(String[] args) throws Exception {
    		CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
    				.connectString("192.168.25.129:2181," + "192.168.25.130:2181," + "192.168.25.131:2181")
    				.sessionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).namespace("curator").build();
    
    		curatorFramework.start();
    		 //当前节点的创建和删除事件监听 ---永久的
    //      addListenerWithNodeCache(curatorFramework,"/lf");
          //子节点的增加、修改、删除的事件监听
          addListenerWithPathChildCache(curatorFramework,"/lf");
          //综合节点监听事件
    //      addListenerWithTreeCache(curatorFramework,"/lf");
          System.in.read();
      }
    
      public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
          TreeCache treeCache=new TreeCache(curatorFramework,path);
          TreeCacheListener treeCacheListener=new TreeCacheListener() {
              @Override
              public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                  System.out.println(event.getType()+"->"+event.getData().getPath());
              }
          };
    
          treeCache.getListenable().addListener(treeCacheListener);
          treeCache.start();
      }
    
      /**
       * PathChildrenCache 监听一个节点下子节点的创建、删除、更新
       * NodeCache  监听一个节点的更新和创建事件
       * TreeCache  综合PathChildrenCache和NodeCache的特性
       */
    
      public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
          PathChildrenCache pathChildrenCache=new PathChildrenCache(curatorFramework,path,true);
    
          PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
              @Override
              public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                  System.out.println("Receive Event2:"+event.getType());
              }
          };
    
          pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
          pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    
      }
    
      // 监听一个节点的更新,创建/lf节点事件
      public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
          final NodeCache nodeCache=new NodeCache(curatorFramework,path,false);
          NodeCacheListener nodeCacheListener=new NodeCacheListener() {
              @Override
              public void nodeChanged() throws Exception {
                  System.out.println("Receive Event1:"+nodeCache.getCurrentData().getPath());
              }
          };
          nodeCache.getListenable().addListener(nodeCacheListener);
    	}
    }
    

      

  • 相关阅读:
    约瑟夫问题
    再谈Bellman-Ford
    Uva 11478 Halum操作
    Uva 11090 在环中
    Bellman-Ford
    Uva 10537 过路费
    Uva 10917
    LA 3713 宇航员分组
    2-SAT
    LA 3211 飞机调度
  • 原文地址:https://www.cnblogs.com/flgb/p/10533325.html
Copyright © 2011-2022 走看看