zoukankan      html  css  js  c++  java
  • 获取管理权

       现在我们有了会话,我们的Master程序需要获得管理权,虽然现在我们只有一个主节点,但我们还是要小心仔细。我们需要运行多个进程,以便在活动主节点发生故障后,可以有进程接替主节点。为了确保同一时间只有一个主节点进程出于活动状态,我们使用ZooKeeper来实现简单的群首选举算法(在2.4.1节中所描述的)。这个算法中,所有潜在的主节点进程尝试创建/master节点,但只有一个成功,这个成功的进程成为主节点

      ZooKeeper通过插件式的认证方法提供了每个节点的ACL策略功能,因此,如果我们需要,就可以限制某个用户对某个znode节点的哪些权限,但对于这个简单的例子,我们继续使用OPEN_ACL_UNSAFE策略。当然,我们希望在主节点死掉后/master节点会消失。正如我们在2.1.2节中所提到的持久性和临时性znode节点,我们可以使用ZooKeeper的临时性znode节点来达到我们的目的。我们将定义一个EPHEMERAL的znode节点,当创建它的会话关闭或无效时,ZooKeeper会自动检测到,并删除这个节点。

    因此,我们将会在我们的程序中添加以下代码:
    String serverId = Integer.toHexString(random.nextInt());
    void runForMaster() {
    zk.create("/master", ①
    serverId.getBytes(), ②
    OPEN_ACL_UNSAFE, ③
    CreateMode.EPHEMERAL); ④
    }

    ①我们试着创建znode节点/master。如果这个znode节点存在,create
    就会失败。同时我们想在/master节点的数据字段保存对应这个服务器的唯
    一ID。
    ②数据字段只能存储字节数组类型的数据,所以我们将int型转换为一
    个字节数组。
    ③如之前所提到的,我们使用开放的ACL策略。
    ④我们创建的节点类型为EPHEMERAL。

    
    

     

      然而,我们这样做还不够,create方法会抛出两种异常:KeeperException和InterruptedException。我们需要确保我们处理了这两种异常,特别是ConnectionLossException(KeeperException异常的子类)和InterruptedException。对于其他异常,我们可以忽略并继续执行,但对于这两种异常,create方法可能已经成功了,所以如果我们作为主节点就需要捕获并处理它们。

      ConnectionLossException异常发生于客户端与ZooKeeper服务端失去连接时。一般常常由于网络原因导致,如网络分区或ZooKeeper服务器故障。当这个异常发生时,客户端并不知道是在ZooKeeper服务器处理前丢失了请求消息,还是在处理后客户端未收到响应消息。如我们之前所描述的,ZooKeeper的客户端库将会为后续请求重新建立连接,但进程必须知道一个·  未决请求是否已经处理了还是需要再次发送请求。InterruptedException异常源于客户端线程调用了Thread.interrupt,通常这是因为应用程序部分关闭,但还在被其他相关应用的方法使用。从字面来看这个异常,进程会中断本地客户端的请求处理的过程,并使该请求处于未知状态。

        这两种请求都会导致正常请求处理过程的中断,开发者不能假设处理过程中的请求的状态。当我们处理这些异常时,开发者在处理前必须知道系统的状态。如果发生群首选举,在我们没有确认情况之前,我们不希望确定主节点。如果create执行成功了,活动主节点死掉以前,没有任何进程能够成为主节点,如果活动主节点还不知道自己已经获得了管理权,不会有任何进程成为主节点进程。当处理ConnectionLossException异常时,我们需要找出那个进程创建的/master节点,如果进程是自己,就开始成为群首角色。我们通过getData方法来处理:

    byte[] getData(
    String path,
    bool watch,
    Stat stat)

     

    其中:
    path
      类似其他ZooKeeper方法一样,第一个参数为我们想要获取数据的
    znode节点路径。

    watch
      表示我们是否想要监听后续的数据变更。如果设置为true,我们就可
    以通过我们创建ZooKeeper句柄时所设置的Watcher对象得到事件,同时另
    一个版本的方法提供了以Watcher对象为入参,通过这个传入的对象来接收
    变更的事件。我们在后续章节再讨论如何监视变更情况,现在我们设置这
    个参数为false,因为我们现在我们只想知道当前的数据是什么。

    stat
    最后一个参数类型Stat结构,getData方法会填充znode节点的元数据信
    息。


    返回值
    方法返回成功(没有抛出异常),就会得到znode节点数据的字节数
    组。
    让我们按以下代码段来修改代码,在runForMaster方法中引入异常处
    理:

     

    String serverId = Integer.toString(Random.nextLong());
    boolean isLeader = false;
    // returns true if there is a master
    boolean checkMaster() {
    while (true) {
    try {
    Stat stat = new Stat();
    byte data[] = zk.getData("/master", false, stat); ①
    isLeader = new String(data).equals(serverId)); ②
    return true;
    } catch (NoNodeException e) {
    // no master, so try create again
    return false;
    } catch (ConnectionLossException e) {
    }
    }
    }
    void runForMaster() throws InterruptedException { ③
    while (true) {
    try { ④
    zk.create("/master", serverId.getBytes(),
    OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); ⑤
    isLeader = true;
    break;
    } catch (NodeExistsException e) {
    isLeader = false;
    break;
    } catch (ConnectionLossException e) { ⑥
    }
    if (checkMaster()) break; ⑦
    }
    }

    ①通过获取/master节点的数据来检查活动主节点。
    ②该行展示了为什么我们需要使用在创建/master节点时保存的数据:
    如果/master存在,我们使用/master中的数据来确定谁是群首。如果一个
    进程捕获到ConnectionLossException,这个进程可能就是主节点,因create
    操作实际上已经处理完,但响应消息却丢失了。
    ③我们将InterruptedException异常简单地传递给调用者。
    ④我们将zk.create方法包在try块之中,以便我们捕获并处理
    ConnectionLossException异常。
    ⑤这里为create请求,如果成功执行将会成为主节点。
    ⑥处理ConnectionLossException异常的catch块的代码为空,因为我们
    并不想中止函数,这样就可以使处理过程继续向下执行。
    ⑦检查活动主节点是否存在,如果不存在就重试。

     

       在这个例子中,我们简单地传递InterruptedException给调用者,即向上传递异常。不过,在Java中没有明确的指导方针告诉我们如何处理线程中断,甚至没有告诉我们这个中断代表什么。有些时候,中断用于通知线程现在要退出了,需要进行清理操作,另外的情况,中断用于获得一个线程的控制权,应用的执行还将继续。InterruptedException异常的处理依赖于程序的上下文环境,如果向上抛出InterruptedException异常,最终关闭zk句柄,我们可以抛出异常到调用栈顶,当句柄关闭时就可以清理所有一切。如果zk句柄未关闭,在重新抛出异常前,我们需要弄清楚自己是不是主节点,或者继续异步执行后续操。后者情况非常棘手,需要我们仔细设计并妥善处理。

    现在,我们看一下Master的main主函数:

    public static void main(String args[]) throws Exception {
    Master m = new Master(args[0]);
    m.startZK();
    m.runForMaster(); ①
    if (isLeader) {
    System.out.println("I'm the leader"); ②
    // wait for a bit
    Thread.sleep(60000);
    } else {
    System.out.println("Someone else is the leader");
    }
    m.stopZK();

     

     

    1.1.1异步获取管理权

    ZooKeeper中,所有同步调用方法都有对应的异步调用方法。通过异步调用,我们可以在单线程中同时进行多个调用,同时也可以简化我们的实

    现方式。让我们回顾管理权的例子,修改为异步调用的方式。

     以下为create方法的异步调用版本:

     

    void create(String path,
    byte[] data,
    List<ACL> acl,
    CreateMode createMode,
    AsyncCallback.StringCallback cb, ①
    Object ctx) ②

     

     

    create方法的异步方法与同步方法非常相似,仅仅多了两个参数:
    ①提供回调方法的对象。
    ②用户指定上下问信息(回调方法调用是传入的对象实例)。

    该方法调用后通常在create请求发送到服务端之前就会立即返回。回调对象通过传入的上下文参数来获取数据,当从服务器接收到create请求的结果时,上下文参数就会通过回调对象提供给应用程序。注意,该create方法不会抛出异常,我们可以简化处理,因为调用返回前并不会等待create命令完成,所以我们只需关心InterruptedException异常;同时因请求的所有错误信息通过回调对象会第一个返回,所以我们也无需关心KeeperException异常。

     

    回调对象实现只有一个方法的StringCallback接口:

    void processResult(int rc, String path, Object ctx, String name)

     

      异步方法调用会简单化队列对ZooKeeper服务器的请求,并在另一个线程中传输请求。当接收到响应信息,这些请求就会在一个专用回调线程中

    被处理。为了保持顺序,只会有一个单独的线程按照接收顺序处理响应包:

    processResult各个参数的含义如下:
    rc
    返回调用的结构,返回OK或与KeeperException异常对应的编码值。
    path
    我们传给create的path参数值。
    ctx
    我们传给create的上下文参数。
    name
    创建的znode节点名称。
    目前,调用成功后,path和name的值一样,但是,如果采用
    CreateMode.SEQUENTIAL模式,这两个参数值就不会相等。

     


    注意:回调函数处理

    因为只有一个单独的线程处理所有回调调用,如果回调函数阻塞,所有后续回调调用都会被阻塞,也就是说,意思是一般不要在回调函数中集中操作或阻塞操作。有时,在回调函数中调用同步方法是合法的,但一般还是避免这样做,以便后续回调调用可以快速被处理。

    让我们继续完成我们的主节点的功能,我们创建了masterCreateCallback对象,对于接收create命令的结果

    static boolean isLeader;
    static StringCallback masterCreateCallback = new StringCallback() {
    void processResult(int rc, String path, Object ctx, String name) {
    switch(Code.get(rc)) { ①
    case CONNECTIONLOSS: ②
    checkMaster();
    return;
    case OK: ③
    isLeader = true;
    break;
    default: ④
    isLeader = false;
    }
    System.out.println("I'm " + (isLeader ? "" : "not ") +
    "the leader");
    }
    };
    void runForMaster() {
    zk.create("/master", serverId.getBytes(), OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL, masterCreateCallback, null); ⑤
    }

    ①我们从rc参数中获得create请求的结果,并将其转换为Code枚举类
    型。rc如果不为0,则对应KeeperException异常。
    ②如果因连接丢失导致create请求失败,我们会得到
    CONNECTIONLOSS编码的结果,⽽不是ConnectionLossException异常。
    当连接丢失时,我们需要检查系统当前的状态,并判断我们需要如何恢
    复,我们将会在我们后面实现的checkMaster方法中进行处理。
    ③我们现在成为群首,我们先简单地赋值isLeader为true。
    ④其他情况,我们并未成为群首。
    ⑤在runForMaster方法中,我们将masterCreateCallback传给create方
    法,传入null作为上下文对象参数,因为在runForMaster方法中,我们现
    在不需要向masterCreateCallback.processResult方法传入任何信息。

     

     

     

    我们现在需要实现checkMaster方法,这个方法与之前的同步情况不太
    一样,我们通过回调方法实现处理逻辑,因此在checkMaster函数中不会看
    到一系列的事件,而只有getData方法。getData调用完成后,后续处理将会
    在DataCallback对象中继续:

    DataCallback masterCheckCallback = new DataCallback() {
    void processResult(int rc, String path, Object ctx, byte[] data,
    Stat stat) {
    switch(Code.get(rc)) {
    case CONNECTIONLOSS:
    checkMaster();
    return;
    case NONODE:
    runForMaster();
    return;
    }
    }
    }
    void checkMaster() {
    zk.getData("/master", false, masterCheckCallback, null);
    }

     

     

    同步方法和异步方法的处理逻辑是一样的,只是异步方法中,我们没
    有使用while循环,而是通过异步操作在回调函数中进行错误处理。
    此时,同步的版本看起来比异步版本实现起来更简单,但在下一章我
    们会看到,应用程序常常由异步变化通知所驱动,因此最终以异步方式构
    建系统,反而使代码更简单。同时,异步调用不会阻塞应用程序,这样其
    他事务可以继续进行,甚至是提交新的ZooKeeper操作。

     

     1.1.2设置元数据

    我们将使用异步API方法来设置元数据路径。我们的主从模型设计依
    赖三个目录:/tasks、/assign和/workers,我们可以在系统启动前通过某些
    系统配置来创建所有目录,或者通过在主节点程序每次启动时都创建这些
    目录。以下代码段会创建这些路径,例子中除了连接丢失错误的处理外没
    有其他错误处理:

    public void bootstrap() {
    createParent("/workers", new byte[0]); ①
    createParent("/assign", new byte[0]);
    createParent("/tasks", new byte[0]);
    createParent("/status", new byte[0]);
    }
    void createParent(String path, byte[] data) {
    zk.create(path,
    data,
    Ids.OPEN_ACL_UNSAFE,
    CreateMode.PERSISTENT,
    createParentCallback,
    data); ②
    }
    StringCallback createParentCallback = new StringCallback() {
    public void processResult(int rc, String path, Object ctx, String name) {
    switch (Code.get(rc)) {
    case CONNECTIONLOSS:
    createParent(path, (byte[]) ctx); ③
    break;
    case OK:
    LOG.info("Parent created");
    break;
    case NODEEXISTS:
    LOG.warn("Parent already registered: " + path);
    break;
    default:
    LOG.error("Something went wrong: ",
    KeeperException.create(Code.get(rc), path));
    }
    }
    };

    ①我们没有数据存入这些znode节点,所以只传入空的字节数组。
    ②因为如此,我们不用关心去跟踪每个znode节点对应的数据,但是往
    往每个路径都具有独特的数据,所以我们通过回调上下文参数对create操
    作进行跟踪数据。在create函数的第一个和第四个参数均传入的data对象,
    也许看起来有些奇怪,但第一个参数传入的data表示要保存到znode节点的
    数据,⽽第四个参数传⼊的data,我们可以在createParentCallback回调函
    数中继续使用。
    ③如果回调函数中得到CONNECTIONLOSS返回码,我们通过调用
    createPath方法来对create操作进行重试,然后调用createPath我们需要知
    道之前的create调用中的data参数,因此我们通过create的第四个参数传入
    data,就可以将数据通过ctx对象传给回调函数。因为上下文对象与回调对
    象不同,我们可以使所有create操作使用同一个回调对象。

     

    从本例中,你会注意到znode节点与问件(一个包含数据的znode节
    点)和目录(含有子节点的znode节点)没有什么区别,每个znode节点可
    以具备以上两个特点。

     

  • 相关阅读:
    Java数据类型转换
    Java数据类型
    Revisiting Network Support for RDMA
    FBOSS: Building Switch Software at Scale
    Edge-assisted Traffic Engineering and applications in the IoT
    Edge Intelligence: On-Demand Deep Learning Model Co-Inference with Device-Edge Synergy
    ARVE: Augmented Reality Applications in Vehicle to Edge Networks
    Deployment Characteristics of "The Edge" in Mobile Edge Computing
    CABaRet: Leveraging Recommendation Systems for Mobile Edge Caching
    Anveshak: Placing Edge Servers In The Wild
  • 原文地址:https://www.cnblogs.com/duan2/p/9022390.html
Copyright © 2011-2022 走看看