zoukankan      html  css  js  c++  java
  • Giraph源代码分析(九)—— Aggregators 原理解析

    HamaWhite 原创。转载请注明出处!欢迎大家增加Giraph 技术交流群: 228591158

    Giraph中Aggregator的基本使用方法请參考官方文档:http://giraph.apache.org/aggregators.html 。本文重点在解析Giraph怎样实现Aggregators后文用图示的方法描写叙述了Aggregator的运行过程。

    基本原理:在每一个超级步中,每一个Worker计算本地的聚集值。

    超级步计算完毕后,把本地的聚集值发送给Master汇总。在MasterCompute()运行后,把全局的聚集值回发给全部的Workers。

    缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完毕全部聚集器的计算。由于Master要接受、处理、发送大量的数据,不管是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。

    改进:採用分片聚集 (sharded aggregators) . 在每一个超级步的最后。每一个聚集器被派发给一个Worker。该Worker接受和聚集其它Workers发送给该聚集器的值。

    然后Workers把自己的全部的聚集器发送给Master。这样Master就无需运行不论什么聚集,仅仅是接收每一个聚集器的终于值。在MasterCompute.compute运行后,Master不是直接把全部的聚集器发送给全部的Workers,而是发送给聚集器所属的Worker。然后每一个Worker再把其上的聚集器发送给全部的Workers.

    首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每一个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
    1).  org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
    功能:每一个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
    2).  org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
    功能:每一个Worker把自己所拥有的Aggregator的终于 aggregated values 发送给 master。
    3).  org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
    功能:master把终于的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
    4).  org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
    功能: 发送终于的 aggregated values 到 其它workers。发送者为该Aggregator的拥有者。接受者为除发送者之外的全部workers。



    Aggregator分类和 注冊

        Giraph中把Aggregator分为两类:regular aggregators和persistent aggregators。

    regular aggregators的值在每一个超级步開始会被重置为初始值,然而persistent aggregators的值在整个应用(算法)中一直保持。

    举例来说。若LongSumAggregator在每一个顶点的compute()方法中加1。假设使用regular aggregators,在每一个超级步中就能够读取前一个超级步的參与计算的顶点总数;假设使用persistent aggregators,就能够获取前面全部超级步中參与计算的顶点总和。

        在使用aggregator之前,必需要在mastes上Registering aggregators。做法:继承org.apache.giraph.master.DefaultMasterCompute类,重写 void initalize() 方法。

    在该方法中注冊aggregators。语法例如以下:

        registerAggregator(aggregatorName, aggregatorClass)
        registerPersistentAggregator(aggregatorName, aggregatorClass)

       说明:MasterCompute.initalize()方法仅仅在第 INPUT_SUPERSTEP (-1) 超级步中运行一次。详细在 BSPServiceMaster.runMasterCompute(long superstep)方法中。在MasterCompute.compute()方法中,能够使用下述方法读取或改动聚集器的值。

         getAggregatedValue(aggregatorName) //获取前一个超级步的聚集器值
         setAggregatedValue(aggregatorName, aggregatedValue) //改动聚集器的值

         MasterCompute.compute()总是在Vertex.compute()前运行。 因为第 INPUT_SUPERSTEP ( -1)个超级步进行的是数据的载入和重分布过程,不计算Vertex.compute()。第0个超级步Vertex.compute()又是在MasterCompute.compute()方法后运行。故对第 -1 、 0个超级步MasterCompute.compute()方法中获得的聚集器值均为其初始值。从第1个超级步開始。MasterCompute.compute()方法才获得了全部Vertex.compute()在第0个超级步聚集的值。

    1. 从第0个超级步開始。BspServiceMaster调用MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法把聚集器派发给Worker。聚集器的value为上一个超级步的全局聚集值(final aggregated values)。第一次为初始值。先给出MasterAggregatorHandler的类继承关系。例如以下:


    finishSuperStep(MasterClient masterClient) 方法核心内容例如以下:

      /**
       * Finalize aggregators for current superstep and share them with workers
       */
      public void finishSuperstep(MasterClient masterClient) {
        for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
          if (aggregator.isChanged()) {
            // if master compute changed the value, use the one he chose
            aggregator.setPreviousAggregatedValue(
                aggregator.getCurrentAggregatedValue());
            // reset aggregator for the next superstep
            aggregator.resetCurrentAggregator();
          }
        }
        
        /**
         * 把聚集器发送给所属的Worker。发送内容:
         * 1). Name of the aggregator
         * 2). Class of the aggregator
         * 3). Value of the aggretator
         */
        try {
          for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
              aggregatorMap.entrySet()) {
            masterClient.sendAggregator(entry.getKey(),
                entry.getValue().getAggregatorClass(),
                entry.getValue().getPreviousAggregatedValue());
          }
          masterClient.finishSendingAggregatedValues();
        } catch (IOException e) {
          throw new IllegalStateException("finishSuperstep: " +
              "IOException occurred while sending aggregators", e);
        }
      }
    问题1:怎样确定aggregator的Worker Owner ?
    答:依据aggregator的Name来确定它所属的Worker。计算方法例如以下:
    /**
     * 依据aggregatorName和全部的workers列表来计算aggregator所属的Worker
     * 參数aggregatorName:Name of the aggregator
     * 參数workers: Workers的list列表
     * 返回值:Worker which owns the aggregator
     */
    public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) {
        //用aggregatorName的HashCode()值模以 Workers的总数目
        int index = Math.abs(aggregatorName.hashCode() % workers.size());
        return workers.get(index);  //返回aggregator所属的Worker
    }
    问题2:Worker 怎样推断自身是否接收完自己所拥有的aggregators?
    答:Master给某个Worker发送aggregators时。同一时候发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。

    2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其它全部Workers,然后每一个Workers就会得到上一个超级步的全局聚集值。
    由前文知道,每一个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量例如以下:

    // 保存Worker在当前超步拥有的aggregators
    private final OwnerAggregatorServerData ownerAggregator;
    // 保存前一个超步的aggregators
    private final AllAggregatorServerData allAggregatorData;
    能够看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,例如以下:

    public void doRequest(ServerData serverData) {
        DataInput input = getDataInput();
        AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
        try {
          //收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
          //每向输出流中加入一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。

    int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); String aggregatorClassName = input.readUTF(); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); //Master发送给该Worker的requests总数目. count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, aggregatorClass); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); //把收到的上一次全局聚集的值赋值给allAggregatorData aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); //ownerAggregatorData仅仅接受聚集器 serverData.getOwnerAggregatorData().registerAggregator( aggregatorName, aggregatorClass); } } } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); } //接受一个 request,计数减1。同一时候把收到的Data加入到allAggregatorServerData的List<byte[]> masterData中 aggregatorData.receivedRequestFromMaster(getData()); }

        每一个Worker在開始计算前。会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其它Workers发送的聚集器值。调用关系例如以下:


        BspServiceWorker类的prepareSuperStep()方法例如以下:

    @Override
    public void prepareSuperstep() {
       if (getSuperstep() != INPUT_SUPERSTEP) {
         /*
          * aggregatorHandler为WorkerAggregatorHandler类型,
          * 可參考上文中MasterAggregatorHandler的类继承关系.
          * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)
          * 类型,实际为NettyWorkerAggregatorRequestProcessor的实例。
          * 用于Worker间发送聚集器的值。

    */ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); } }


    WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法例如以下:

    public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
        AllAggregatorServerData allAggregatorData =
            serviceWorker.getServerData().getAllAggregatorData();
        /**
         * 等待直到Master发送给该Worker的聚集器都已接受完,
         * 返回值为Master发送给该Worker的全部Data(聚集器)
         */
        Iterable<byte[]> dataToDistribute =
            allAggregatorData.getDataFromMasterWhenReady(
                serviceWorker.getMasterInfo());
      
        // 把从Master收到的Data(聚集器)发送给其它全部Workers
        requestProcessor.distributeAggregators(dataToDistribute);
    
        // 等待直到接受完其它Workers发送给该Workers的聚集器
        allAggregatorData.fillNextSuperstepMapsWhenReady(
            getOtherWorkerIdsSet(), previousAggregatedValueMap,
            currentAggregatorMap);
        // 仅仅是清空allAggregatorServerData的List<byte[]> masterData对象
        // 为下一个超级步接受Master发送的聚集器做准备
        allAggregatorData.reset();
    }
    以下详述Worker怎样判定已接收全然部Master发送的全部Request ? 主要目的在于描写叙述分布式环境下线程间怎样协作。

    在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来推断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制。当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来添加接收的arrivedTaskIds和须要等待的request数目waitingOnPermits. 接受一个Request

      /**
       * Require more permits. This will increase the number of times permits
       * were required. Doesn't wait for permits to become available.
       *
       * @param permits Number of permits to require
       * @param taskId Task id which required permits
       */
      public synchronized void requirePermits(long permits, int taskId) {
        arrivedTaskIds.add(taskId);
        waitingOnPermits += permits;
        notifyAll();
      }

    接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。


    3. 在Vertex.compute()方法中。每一个Worker聚集自身的值。

    计算完毕后。调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其它全部Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。

    finishSuperstep方法例如以下:

     /**
       * Send aggregators to their owners and in the end to the master
       *
       * @param requestProcessor Request processor for aggregators
       */
      public void finishSuperstep(
          WorkerAggregatorRequestProcessor requestProcessor) {
        OwnerAggregatorServerData ownerAggregatorData =
            serviceWorker.getServerData().getOwnerAggregatorData();
        // First send partial aggregated values to their owners and determine
        // which aggregators belong to this worker
        for (Map.Entry<String, Aggregator<Writable>> entry :
            currentAggregatorMap.entrySet()) {
            boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
                entry.getValue().getAggregatedValue());
            if (!sent) {
              // If it's my aggregator, add it directly
              ownerAggregatorData.aggregate(entry.getKey(),
                  entry.getValue().getAggregatedValue());
            }
        }
        // Flush
        requestProcessor.flush();
        // Wait to receive partial aggregated values from all other workers
        Iterable<Map.Entry<String, Writable>> myAggregators =
            ownerAggregatorData.getMyAggregatorValuesWhenReady(
                getOtherWorkerIdsSet());
    
        // Send final aggregated values to master
        AggregatedValueOutputStream aggregatorOutput =
            new AggregatedValueOutputStream();
        for (Map.Entry<String, Writable> entry : myAggregators) {
            int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
                entry.getValue());
            if (currentSize > maxBytesPerAggregatorRequest) {
              requestProcessor.sendAggregatedValuesToMaster(
                  aggregatorOutput.flush());
            }   
        }
        requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
        // Wait for master to receive aggregated values before proceeding
        serviceWorker.getWorkerClient().waitAllRequests();
        ownerAggregatorData.reset();
      }
    调用关系例如以下:

    4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法。收集聚集器的值。方法内容例如以下:

      public void prepareSuperstep(MasterClient masterClient) {
    
        // 收集上次超级步的聚集值,为master compute 做准备
        for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
    	// 假设是 Persistent Aggregator,则累加
    	if (aggregator.isPersistent()) {
            aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
          }
          aggregator.setPreviousAggregatedValue(
              aggregator.getCurrentAggregatedValue());
          aggregator.resetCurrentAggregator();
          progressable.progress();
        }
      }
    然后调用MasterCompute.compute()方法(可能会改动聚集器的值),在该方法内若依据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知全部Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法。则回到步骤1继续进行迭代。

    说明:Job迭代结束条件有三,满足其一即可:
    1) 达到最大迭代次数
    2) 没有活跃顶点且没有消息在传递
    3) 终止MasterCompute计算

    总结:为解决在多个Aggregator条件下,Master成为系统瓶颈的问题。採取了把全部Aggregator派发给某一部分Workers。由这些Workers完毕全局的聚集值的计算与发送,Master仅仅须要与这些Workers进行简单数据通信就可以,大大减少了Master的工作量。

    附加:以下用图示方法说明上述运行过程。

    实验条件:
        1). 一个Master,四个Worker
        2). 两个Aggregators,记为A1和A2。

    1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。

    下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完毕,使用的是SendAggregatorsToOwnerRequest 通信协议。注:每一个Owner Worker 可能有多个聚集器。


    图1 Master分发Aggregator

    2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其它Workers。

    Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完毕,使用的是SendAggregatorsToMasterRequest 通信协议。此步骤完毕后,每一个Worker上都有了聚集器A1和A2(详细为上一个超步的全局终于聚集值)。


    3. 每一个Worker调用Vertex.compute()方法開始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1Worker2Worker3Worker4的本地聚集值依次记为:A11 A12 A13A14。对聚集器A2来说,Worker1Worker2Worker3Worker4的本地聚集值依次记为:A21 A22 A23A24。计算完毕后,每一个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接收的时候会合并聚集。

    那么A1A12 A13A14要发送给Worker1进行全局聚集得到A1’A21 A22 A23A24要发送给Worker3进行全局聚集得到A2’。计算公式例如以下:


    此部分採用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用採用的是SendAggregatorsToMasterRequest通信协议。

    此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完毕。步骤例如以下图所看到的:

    4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完毕。然后调用MasterCompute.compute()方法,此方法可能会改动聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若依据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知全部Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法。则回到步骤1继续进行迭代,继续把A1’’发送给Worker1。A2’’发送给Worker3。

    完。

    本人原创,转载请注明出处!

    欢迎大家增加Giraph 技术交流群: 228591158

  • 相关阅读:
    Ubuntu 16 安装redis客户端
    crontab 参数详解
    PHP模拟登录发送闪存
    Nginx配置端口访问的网站
    Linux 增加对外开放的端口
    Linux 实用指令之查看端口开启情况
    无敌的极路由
    不同的域名可以指向同一个项目
    MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error
    Redis 创建多个端口
  • 原文地址:https://www.cnblogs.com/mfmdaoyou/p/7375726.html
Copyright © 2011-2022 走看看