zoukankan      html  css  js  c++  java
  • zookeeper之Watcher的基本流程

    Watcher 的基本流程
    ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理 Watcher 和客户端回调 Watcher客户端注册 watcher 有 3 种方式,getData、exists、getChildren;以如下代码为例来分析整个触发机制的原理。
     
    基于 zkclient 客户端发起一个数据操作
    <dependency>
         <groupId>com.101tec</groupId>
         <artifactId>zkclient</artifactId>
         <version>0.10</version>
    </dependency>
    
    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
    		ZooKeeper zookeeper = new ZooKeeper("192.168.13.102:2181", 4000, new Watcher() {
    			@Override
    			public void process(WatchedEvent event) {
    				System.out.println("event.type" + event.getType());
    			}
    		});
    		zookeeper.create("/watch", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // 创建节点
    		zookeeper.exists("/watch", true); // 注册监听
    		Thread.sleep(1000);
    		zookeeper.setData("/watch", "1".getBytes(), -1); // 修改节点的值触发监听
    		System.in.read();
    	}
     
    ZooKeeper API 的初始化过程
    ZooKeeper zookeeper=new ZooKeeper(“192.168.11.152:2181”,4000,new Watcher(){
    			public void processor(WatchedEvent event){
    			System.out.println(“event.type”);
    		}
    	});
    在创建一个 ZooKeeper 客户端对象实例时,我们通过 new Watcher()向构造方法中传入一个默认的 Watcher, 这个 Watcher 将作为整个 ZooKeeper 会话期间的默认Watcher,会一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中;代码如下:
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig) throws IOException {
    	LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
    	if (clientConfig == null) {
    		clientConfig = new ZKClientConfig();
    	}
    	this.clientConfig = clientConfig;
    	watchManager = defaultWatchManager();
    	watchManager.defaultWatcher = watcher; //在这里将 watcher 设置到ZKWatchManager
    	ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    	hostProvider = aHostProvider;
    	//初始化了 ClientCnxn,并且调用 cnxn.start()方法
    	cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
    	hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);
    	cnxn.start();
    }
    ClientCnxn:是 Zookeeper 客户端和 Zookeeper 服务器端进行通信和事件通知处理的主要类,它内部包含两个类,
      1. SendThread :负责客户端和服务器端的数据通信, 也包括事件信息的传输
      2. EventThread : 主要在客户端回调注册的 Watchers 进行通知处理
    ClientCnxn 初始化
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
    			 ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
    	{
    		this.zooKeeper = zooKeeper;
    		this.watcher = watcher;
    		this.sessionId = sessionId;
    		this.sessionPasswd = sessionPasswd;
    		this.sessionTimeout = sessionTimeout;
    		this.hostProvider = hostProvider;
    		this.chrootPath = chrootPath;
    		connectTimeout = sessionTimeout / hostProvider.size();
    		readTimeout = sessionTimeout * 2 / 3;
    		readOnly = canBeReadOnly;
    		sendThread = new SendThread(clientCnxnSocket); //初始化 sendThread
    		eventThread = new EventThread(); //初始化 eventThread
    		this.clientConfig=zooKeeper.getClientConfig();
    	 }
    
    	public void start() { //启动两个线程
    		sendThread.start();
    		eventThread.start();
    	}
    
  • 相关阅读:
    数据排序
    (一)Spark简介Java&Python版Spark
    醒 了
    祈福
    可以接受失败,但不选择放弃
    烦中偷乐
    Yahoo! UI Library入门
    文章内容的简单优化方法
    Asp.Net网站速度优化
    ASP.NET实现GZIP压缩优化
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13567762.html
Copyright © 2011-2022 走看看