  • zookeeper 回调和Watcher




     1 protected void connectToZK(String newHost) throws InterruptedException, IOException {
     2         if (zk != null && zk.getState().isAlive()) {
     3             zk.close();
     4         }
     5         host = newHost;
     6         zk = new ZooKeeper(host,
     7                  Integer.parseInt(cl.getOption("timeout")),
     8                  new MyWatcher());
     9  }
    11 private class MyWatcher implements Watcher {
    12         public void process(WatchedEvent event) {
    13             if (getPrintWatches()) {
    14                 ZooKeeperMain.printMessage("WATCHER::");
    15                 ZooKeeperMain.printMessage(event.toString());
    16             }
    17         }
    18  }
     1 public List<String> getChildren(final String path, Watcher watcher)
     2         throws KeeperException, InterruptedException
     3     {
     4         //将watcher封装成childwatcher
     5         WatchRegistration wcb = null;
     6         if (watcher != null) {
     7             wcb = new ChildWatchRegistration(watcher, clientPath);
     8         }
    10         final String serverPath = prependChroot(path);
    11         RequestHeader h = new RequestHeader();
    12         h.setType(ZooDefs.OpCode.getChildren);
    13         GetChildrenRequest request = new GetChildrenRequest();
    14         request.setPath(serverPath);
    15         request.setWatch(watcher != null);
    16         GetChildrenResponse response = new GetChildrenResponse();
    17         ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    18         if (r.getErr() != 0) {
    19             throw KeeperException.create(KeeperException.Code.get(r.getErr()),
    20                     clientPath);
    21         }
    22         return response.getChildren();
    23     }
     1 public ReplyHeader submitRequest(RequestHeader h, Record request,
     2             Record response, WatchRegistration watchRegistration)
     3             throws InterruptedException {
     4         ReplyHeader r = new ReplyHeader();
     5 //watchRegistration被封装到Packet中
     6         Packet packet = queuePacket(h, r, request, response, null, null, null,
     7                     null, watchRegistration);
     8         synchronized (packet) {
     9             while (!packet.finished) {
    10                 packet.wait();
    11             }
    12         }
    13         return r;
    14     }
     1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
     2             Record response, AsyncCallback cb, String clientPath,
     3             String serverPath, Object ctx, WatchRegistration watchRegistration)
     4     {
     5         Packet packet = null;
     6         synchronized (outgoingQueue) {
     7             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
     8                 h.setXid(getXid());
     9             }
    10 //watchRegistration被封装到Packet中
    11             packet = new Packet(h, r, request, response, null,
    12                     watchRegistration);
    13             packet.cb = cb;
    14             packet.ctx = ctx;
    15             packet.clientPath = clientPath;
    16             packet.serverPath = serverPath;
    17             if (!zooKeeper.state.isAlive() || closing) {
    18                 conLossPacket(packet);
    19             } else {
    20                 // If the client is asking to close the session then
    21                 // mark as closing
    22                 if (h.getType() == OpCode.closeSession) {
    23                     closing = true;
    24                 }
    25                 outgoingQueue.add(packet);
    26             }
    27         }
    29         sendThread.wakeup();
    30         return packet;
    31     }
     1 private void finishPacket(Packet p) {
     2         if (p.watchRegistration != null) {
     3             p.watchRegistration.register(p.replyHeader.getErr());
     4         }
     6         if (p.cb == null) {
     7             synchronized (p) {
     8                 p.finished = true;
     9                 p.notifyAll();
    10             }
    11         } else {
    12             p.finished = true;
    13             eventThread.queuePacket(p);
    14         }
    15     }
     1 private void processEvent(Object event) {
     2    if (event instanceof WatcherSetEventPair) {
     3 //执行watcher
     4                   WatcherSetEventPair pair = (WatcherSetEventPair) event;
     5                   for (Watcher watcher : pair.watchers) {
     6                       try {
     7                           watcher.process(pair.event);
     8                       } catch (Throwable t) {
     9                           LOG.error("Error while calling watcher ", t);
    10                       }
    11                   }
    12               }
    13 }
     1 public void getChildren(final String path, Watcher watcher,
     2             ChildrenCallback cb, Object ctx)
     3     {
     4         final String clientPath = path;
     5         WatchRegistration wcb = null;
     6         if (watcher != null) {
     7             wcb = new ChildWatchRegistration(watcher, clientPath);
     8         }
    10         final String serverPath = prependChroot(clientPath);
    12         RequestHeader h = new RequestHeader();
    13         h.setType(ZooDefs.OpCode.getChildren);
    14         GetChildrenRequest request = new GetChildrenRequest();
    15         request.setPath(serverPath);
    16         request.setWatch(watcher != null);
    17         GetChildrenResponse response = new GetChildrenResponse();
    18         cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
    19                 clientPath, serverPath, ctx, wcb);
    20     }
     1 Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
     2             Record response, AsyncCallback cb, String clientPath,
     3             String serverPath, Object ctx, WatchRegistration watchRegistration)
     4     {
     5         Packet packet = null;
     6         synchronized (outgoingQueue) {
     7             if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
     8                 h.setXid(getXid());
     9             }
    10             packet = new Packet(h, r, request, response, null,
    11                     watchRegistration);
    12             packet.cb = cb;
    13             packet.ctx = ctx;
    14             packet.clientPath = clientPath;
    15             packet.serverPath = serverPath;
    16             if (!zooKeeper.state.isAlive() || closing) {
    17                 conLossPacket(packet);
    18             } else {
    19                 // If the client is asking to close the session then
    20                 // mark as closing
    21                 if (h.getType() == OpCode.closeSession) {
    22                     closing = true;
    23                 }
    24                 outgoingQueue.add(packet);
    25             }
    26         }
    28         sendThread.wakeup();
    29         return packet;
    30     }
     1 private void processEvent(Object event) {
     2           try {
     3                   Packet p = (Packet) event;
     4                   int rc = 0;
     5                   String clientPath = p.clientPath;
     6                   if (p.replyHeader.getErr() != 0) {
     7                       rc = p.replyHeader.getErr();
     8                   }
     9                   if (p.response instanceof ExistsResponse
    10                           || p.response instanceof SetDataResponse
    11                           || p.response instanceof SetACLResponse) {
    12                       StatCallback cb = (StatCallback) p.cb;
    13                       if (rc == 0) {
    14                           if (p.response instanceof ExistsResponse) {
    15                               cb.processResult(rc, clientPath, p.ctx,
    16                                       ((ExistsResponse) p.response)
    17                                               .getStat());
    18                           } else if (p.response instanceof SetDataResponse) {
    19                               cb.processResult(rc, clientPath, p.ctx,
    20                                       ((SetDataResponse) p.response)
    21                                               .getStat());
    22                           } else if (p.response instanceof SetACLResponse) {
    23                               cb.processResult(rc, clientPath, p.ctx,
    24                                       ((SetACLResponse) p.response)
    25                                               .getStat());
    26                           }
    27                       } else {
    28                           cb.processResult(rc, clientPath, p.ctx, null);
    29                       }
    30                   } else if (p.response instanceof GetDataResponse) {
    31                       DataCallback cb = (DataCallback) p.cb;
    32                       GetDataResponse rsp = (GetDataResponse) p.response;
    33                       if (rc == 0) {
    34                           cb.processResult(rc, clientPath, p.ctx, rsp
    35                                   .getData(), rsp.getStat());
    36                       } else {
    37                           cb.processResult(rc, clientPath, p.ctx, null,
    38                                   null);
    39                       }
    40                   } else if (p.response instanceof GetACLResponse) {
    41                       ACLCallback cb = (ACLCallback) p.cb;
    42                       GetACLResponse rsp = (GetACLResponse) p.response;
    43                       if (rc == 0) {
    44                           cb.processResult(rc, clientPath, p.ctx, rsp
    45                                   .getAcl(), rsp.getStat());
    46                       } else {
    47                           cb.processResult(rc, clientPath, p.ctx, null,
    48                                   null);
    49                       }
    50                   } else if (p.response instanceof GetChildrenResponse) {
    51                       ChildrenCallback cb = (ChildrenCallback) p.cb;
    52                       GetChildrenResponse rsp = (GetChildrenResponse) p.response;
    53                       if (rc == 0) {
    54                           cb.processResult(rc, clientPath, p.ctx, rsp
    55                                   .getChildren());
    56                       } else {
    57                           cb.processResult(rc, clientPath, p.ctx, null);
    58                       }
    59                   } else if (p.response instanceof GetChildren2Response) {
    60                       Children2Callback cb = (Children2Callback) p.cb;
    61                       GetChildren2Response rsp = (GetChildren2Response) p.response;
    62                       if (rc == 0) {
    63                           cb.processResult(rc, clientPath, p.ctx, rsp
    64                                   .getChildren(), rsp.getStat());
    65                       } else {
    66                           cb.processResult(rc, clientPath, p.ctx, null, null);
    67                       }
    68                   } else if (p.response instanceof CreateResponse) {
    69                       StringCallback cb = (StringCallback) p.cb;
    70                       CreateResponse rsp = (CreateResponse) p.response;
    71                       if (rc == 0) {
    72                           cb.processResult(rc, clientPath, p.ctx,
    73                                   (chrootPath == null
    74                                           ? rsp.getPath()
    75                                           : rsp.getPath()
    76                                     .substring(chrootPath.length())));
    77                       } else {
    78                           cb.processResult(rc, clientPath, p.ctx, null);
    79                       }
    80                   } else if (p.cb instanceof VoidCallback) {
    81                       VoidCallback cb = (VoidCallback) p.cb;
    82                       cb.processResult(rc, clientPath, p.ctx);
    83                   }
    84           } catch (Throwable t) {
    85               LOG.error("Caught unexpected throwable", t);
    86           }
    87        }
    88     }
