zoukankan      html  css  js  c++  java
  • flink

     

    读accumlator

    JobManager

    在job finish的时候会汇总accumulator的值,

    newJobStatus match {
      case JobStatus.FINISHED =>
      try {
        val accumulatorResults = executionGraph.getAccumulatorsSerialized()
        val result = new SerializedJobExecutionResult(
          jobID,
          jobInfo.duration,
          accumulatorResults)
    
        jobInfo.client ! decorateMessage(JobResultSuccess(result))
      }

     

    在client请求accumulation时,

    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
        ActorGateway jobManagerGateway = getJobManagerGateway();
        
        Future<Object> response;
        try {
            response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
        } catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }

     

    消息传到job manager

    case message: AccumulatorMessage => handleAccumulatorMessage(message)
    private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
    message match {
      case RequestAccumulatorResults(jobID) =>
        try {
          currentJobs.get(jobID) match {
            case Some((graph, jobInfo)) =>
              val accumulatorValues = graph.getAccumulatorsSerialized()
              sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
            case None =>
              archive.forward(message)
          }
        }

     

    ExecuteGraph

    获取accumulator的值

    /**
     * Gets a serialized accumulator map.
     * @return The accumulator map with serialized accumulator values.
     * @throws IOException
     */
    public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
    
        Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
    
        Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
        for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
            result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
        }
    
        return result;
    }

     

    execution的accumulator聚合,

    /**
     * Merges all accumulator results from the tasks previously executed in the Executions.
     * @return The accumulator map
     */
    public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
    
        Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
    
        for (ExecutionVertex vertex : getAllExecutionVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next != null) {
                AccumulatorHelper.mergeInto(userAccumulators, next);
            }
        }
    
        return userAccumulators;
    }

    具体merge的逻辑,

    public static void mergeInto(Map<String, Accumulator<?, ?>> target, Map<String, Accumulator<?, ?>> toMerge) {
        for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
            Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
            if (ownAccumulator == null) {
                // Create initial counter (copy!)
                target.put(otherEntry.getKey(), otherEntry.getValue().clone());
            }
            else {
                // Both should have the same type
                AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
                        ownAccumulator.getClass(), otherEntry.getValue().getClass());
                // Merge target counter with other counter
                mergeSingle(ownAccumulator, otherEntry.getValue());
            }
        }
    }

     

    更新accumulator

    JobManager

    收到task发来的heartbeat,其中附带accumulators

    case Heartbeat(instanceID, metricsReport, accumulators) =>
      updateAccumulators(accumulators)

    根据jobid,更新到ExecutionGraph

    private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
        accumulators foreach {
          case accumulatorEvent =>
            currentJobs.get(accumulatorEvent.getJobID) match {
              case Some((jobGraph, jobInfo)) =>
                future {
                  jobGraph.updateAccumulators(accumulatorEvent)
                }(context.dispatcher)
              case None =>
              // ignore accumulator values for old job
            }
        }
    }

    根据ExecutionAttemptID, 更新Execution中

    /**
     * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
     * through the UpdateTaskExecutionState message.
     * @param accumulatorSnapshot The serialized flink and user-defined accumulators
     */
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
        Map<String, Accumulator<?, ?>> userAccumulators;
        try {
            flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
            userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
    
            ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = currentExecutions.get(execID);
            if (execution != null) {
                execution.setAccumulators(flinkAccumulators, userAccumulators);
            }
        }
    }

    对于execution,只要状态不是结束,就直接更新

    /**
     * Update accumulators (discarded when the Execution has already been terminated).
     * @param flinkAccumulators the flink internal accumulators
     * @param userAccumulators the user accumulators
     */
    public void setAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
                                Map<String, Accumulator<?, ?>> userAccumulators) {
        synchronized (accumulatorLock) {
            if (!state.isTerminal()) {
                this.flinkAccumulators = flinkAccumulators;
                this.userAccumulators = userAccumulators;
            }
        }
    }

     

    再看TaskManager如何更新accumulator,并发送heartbeat,

     /**
       * Sends a heartbeat message to the JobManager (if connected) with the current
       * metrics report.
       */
      protected def sendHeartbeatToJobManager(): Unit = {
        try {
          val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
    
          val accumulatorEvents =
            scala.collection.mutable.Buffer[AccumulatorSnapshot]()
    
          runningTasks foreach {
            case (execID, task) =>
              val registry = task.getAccumulatorRegistry
              val accumulators = registry.getSnapshot
              accumulatorEvents.append(accumulators)
          }
    
           currentJobManager foreach {
            jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))
          }
        }
      }

    可以看到会把每个running task的accumulators放到accumulatorEvents,然后通过Heartbeat消息发出

     

    而task的accumlators是通过,task.getAccumulatorRegistry.getSnapshot得到

    看看
    AccumulatorRegistry
    /**
     * Main accumulator registry which encapsulates internal and user-defined accumulators.
     */
    public class AccumulatorRegistry {
    
        protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class);
    
        protected final JobID jobID;  //accumulators所属的Job
        protected final ExecutionAttemptID taskID; //taskID
    
        /* Flink's internal Accumulator values stored for the executing task. */
        private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =   //内部的Accumulators
                new HashMap<Metric, Accumulator<?, ?>>();
    
        /* User-defined Accumulator values stored for the executing task. */
        private final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(); //用户定义的Accumulators
    
        /* The reporter reference that is handed to the reporting tasks. */
        private final ReadWriteReporter reporter; 
        
        /**
         * Creates a snapshot of this accumulator registry.
         * @return a serialized accumulator map
         */
        public AccumulatorSnapshot getSnapshot() {
            try {
                return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators);
            } catch (IOException e) {
                LOG.warn("Failed to serialize accumulators for task.", e);
                return null;
            }
        }
    }

    snapshot的逻辑也很简单,

    public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID,
                            Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
                            Map<String, Accumulator<?, ?>> userAccumulators) throws IOException {
        this.jobID = jobID;
        this.executionAttemptID = executionAttemptID;
        this.flinkAccumulators = new SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(flinkAccumulators);
        this.userAccumulators = new SerializedValue<Map<String, Accumulator<?, ?>>>(userAccumulators);
    }

     

    最后,我们如何将统计数据累加到Accumulator上的?

    直接看看Flink内部的Accumulator是如何更新的,都是通过这个reporter来更新的

    /**
     * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out)
     */
    private static class ReadWriteReporter implements Reporter {
    
        private LongCounter numRecordsIn = new LongCounter();
        private LongCounter numRecordsOut = new LongCounter();
        private LongCounter numBytesIn = new LongCounter();
        private LongCounter numBytesOut = new LongCounter();
    
        private ReadWriteReporter(Map<Metric, Accumulator<?,?>> accumulatorMap) {
            accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
            accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut);
            accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
            accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
        }
    
        @Override
        public void reportNumRecordsIn(long value) {
            numRecordsIn.add(value);
        }
    
        @Override
        public void reportNumRecordsOut(long value) {
            numRecordsOut.add(value);
        }
    
        @Override
        public void reportNumBytesIn(long value) {
            numBytesIn.add(value);
        }
    
        @Override
        public void reportNumBytesOut(long value) {
            numBytesOut.add(value);
        }
    }

     

    何处调用到这个report的接口,

    对于in, 在反序列化到record的时候会统计Bytesin和Recordsin

    AdaptiveSpanningRecordDeserializer
    public DeserializationResult getNextRecord(T target) throws IOException {
        // check if we can get a full length;
        if (nonSpanningRemaining >= 4) {
            int len = this.nonSpanningWrapper.readInt();
    
            if (reporter != null) {
                reporter.reportNumBytesIn(len);
            }
            
            if (len <= nonSpanningRemaining - 4) {
                // we can get a full record from here
                target.read(this.nonSpanningWrapper);
    
                if (reporter != null) {
                    reporter.reportNumRecordsIn(1);
                }

     

    所以对于out,反之则序列化的时候写入

    SpanningRecordSerializer
    @Override
    public SerializationResult addRecord(T record) throws IOException {
        int len = this.serializationBuffer.length();
        this.lengthBuffer.putInt(0, len);
    
        if (reporter != null) {
            reporter.reportNumBytesOut(len);
            reporter.reportNumRecordsOut(1);
        }

     

    使用accumulator时,需要首先extends RichFunction by callinggetRuntimeContext().addAccumulator

  • 相关阅读:
    HDU 5791 Two (DP)
    POJ 1088 滑雪 (DPor记忆化搜索)
    LightOJ 1011
    POJ 1787 Charlie's Change (多重背包 带结果组成)
    HDU 5550 Game Rooms (ccpc2015 K)(dp)
    HDU 5542 The Battle of Chibi (ccpc 南阳 C)(DP 树状数组 离散化)
    HDU 5543 Pick The Sticks (01背包)
    HDU 5546 Ancient Go (ccpc2015南阳G)
    NB-IoT的DRX、eDRX、PSM三个模式 (转载,描述的简单易懂)
    MQTT 嵌入式端通讯协议解析(转)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/5757640.html
Copyright © 2011-2022 走看看