zoukankan      html  css  js  c++  java
  • flink

     

    读accumlator

    JobManager

    在job finish的时候会汇总accumulator的值,

    newJobStatus match {
      case JobStatus.FINISHED =>
      try {
        val accumulatorResults = executionGraph.getAccumulatorsSerialized()
        val result = new SerializedJobExecutionResult(
          jobID,
          jobInfo.duration,
          accumulatorResults)
    
        jobInfo.client ! decorateMessage(JobResultSuccess(result))
      }

     

    在client请求accumulation时,

    public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception {
        ActorGateway jobManagerGateway = getJobManagerGateway();
        
        Future<Object> response;
        try {
            response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
        } catch (Exception e) {
            throw new Exception("Failed to query the job manager gateway for accumulators.", e);
        }

     

    消息传到job manager

    case message: AccumulatorMessage => handleAccumulatorMessage(message)
    private def handleAccumulatorMessage(message: AccumulatorMessage): Unit = {
    message match {
      case RequestAccumulatorResults(jobID) =>
        try {
          currentJobs.get(jobID) match {
            case Some((graph, jobInfo)) =>
              val accumulatorValues = graph.getAccumulatorsSerialized()
              sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
            case None =>
              archive.forward(message)
          }
        }

     

    ExecuteGraph

    获取accumulator的值

    /**
     * Gets a serialized accumulator map.
     * @return The accumulator map with serialized accumulator values.
     * @throws IOException
     */
    public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
    
        Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
    
        Map<String, SerializedValue<Object>> result = new HashMap<String, SerializedValue<Object>>();
        for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
            result.put(entry.getKey(), new SerializedValue<Object>(entry.getValue().getLocalValue()));
        }
    
        return result;
    }

     

    execution的accumulator聚合,

    /**
     * Merges all accumulator results from the tasks previously executed in the Executions.
     * @return The accumulator map
     */
    public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
    
        Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<String, Accumulator<?, ?>>();
    
        for (ExecutionVertex vertex : getAllExecutionVertices()) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next != null) {
                AccumulatorHelper.mergeInto(userAccumulators, next);
            }
        }
    
        return userAccumulators;
    }

    具体merge的逻辑,

    public static void mergeInto(Map<String, Accumulator<?, ?>> target, Map<String, Accumulator<?, ?>> toMerge) {
        for (Map.Entry<String, Accumulator<?, ?>> otherEntry : toMerge.entrySet()) {
            Accumulator<?, ?> ownAccumulator = target.get(otherEntry.getKey());
            if (ownAccumulator == null) {
                // Create initial counter (copy!)
                target.put(otherEntry.getKey(), otherEntry.getValue().clone());
            }
            else {
                // Both should have the same type
                AccumulatorHelper.compareAccumulatorTypes(otherEntry.getKey(),
                        ownAccumulator.getClass(), otherEntry.getValue().getClass());
                // Merge target counter with other counter
                mergeSingle(ownAccumulator, otherEntry.getValue());
            }
        }
    }

     

    更新accumulator

    JobManager

    收到task发来的heartbeat,其中附带accumulators

    case Heartbeat(instanceID, metricsReport, accumulators) =>
      updateAccumulators(accumulators)

    根据jobid,更新到ExecutionGraph

    private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
        accumulators foreach {
          case accumulatorEvent =>
            currentJobs.get(accumulatorEvent.getJobID) match {
              case Some((jobGraph, jobInfo)) =>
                future {
                  jobGraph.updateAccumulators(accumulatorEvent)
                }(context.dispatcher)
              case None =>
              // ignore accumulator values for old job
            }
        }
    }

    根据ExecutionAttemptID, 更新Execution中

    /**
     * Updates the accumulators during the runtime of a job. Final accumulator results are transferred
     * through the UpdateTaskExecutionState message.
     * @param accumulatorSnapshot The serialized flink and user-defined accumulators
     */
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
        Map<String, Accumulator<?, ?>> userAccumulators;
        try {
            flinkAccumulators = accumulatorSnapshot.deserializeFlinkAccumulators();
            userAccumulators = accumulatorSnapshot.deserializeUserAccumulators(userClassLoader);
    
            ExecutionAttemptID execID = accumulatorSnapshot.getExecutionAttemptID();
            Execution execution = currentExecutions.get(execID);
            if (execution != null) {
                execution.setAccumulators(flinkAccumulators, userAccumulators);
            }
        }
    }

    对于execution,只要状态不是结束,就直接更新

    /**
     * Update accumulators (discarded when the Execution has already been terminated).
     * @param flinkAccumulators the flink internal accumulators
     * @param userAccumulators the user accumulators
     */
    public void setAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
                                Map<String, Accumulator<?, ?>> userAccumulators) {
        synchronized (accumulatorLock) {
            if (!state.isTerminal()) {
                this.flinkAccumulators = flinkAccumulators;
                this.userAccumulators = userAccumulators;
            }
        }
    }

     

    再看TaskManager如何更新accumulator,并发送heartbeat,

     /**
       * Sends a heartbeat message to the JobManager (if connected) with the current
       * metrics report.
       */
      protected def sendHeartbeatToJobManager(): Unit = {
        try {
          val metricsReport: Array[Byte] = metricRegistryMapper.writeValueAsBytes(metricRegistry)
    
          val accumulatorEvents =
            scala.collection.mutable.Buffer[AccumulatorSnapshot]()
    
          runningTasks foreach {
            case (execID, task) =>
              val registry = task.getAccumulatorRegistry
              val accumulators = registry.getSnapshot
              accumulatorEvents.append(accumulators)
          }
    
           currentJobManager foreach {
            jm => jm ! decorateMessage(Heartbeat(instanceID, metricsReport, accumulatorEvents))
          }
        }
      }

    可以看到会把每个running task的accumulators放到accumulatorEvents,然后通过Heartbeat消息发出

     

    而task的accumlators是通过,task.getAccumulatorRegistry.getSnapshot得到

    看看
    AccumulatorRegistry
    /**
     * Main accumulator registry which encapsulates internal and user-defined accumulators.
     */
    public class AccumulatorRegistry {
    
        protected static final Logger LOG = LoggerFactory.getLogger(AccumulatorRegistry.class);
    
        protected final JobID jobID;  //accumulators所属的Job
        protected final ExecutionAttemptID taskID; //taskID
    
        /* Flink's internal Accumulator values stored for the executing task. */
        private final Map<Metric, Accumulator<?, ?>> flinkAccumulators =   //内部的Accumulators
                new HashMap<Metric, Accumulator<?, ?>>();
    
        /* User-defined Accumulator values stored for the executing task. */
        private final Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>(); //用户定义的Accumulators
    
        /* The reporter reference that is handed to the reporting tasks. */
        private final ReadWriteReporter reporter; 
        
        /**
         * Creates a snapshot of this accumulator registry.
         * @return a serialized accumulator map
         */
        public AccumulatorSnapshot getSnapshot() {
            try {
                return new AccumulatorSnapshot(jobID, taskID, flinkAccumulators, userAccumulators);
            } catch (IOException e) {
                LOG.warn("Failed to serialize accumulators for task.", e);
                return null;
            }
        }
    }

    snapshot的逻辑也很简单,

    public AccumulatorSnapshot(JobID jobID, ExecutionAttemptID executionAttemptID,
                            Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators,
                            Map<String, Accumulator<?, ?>> userAccumulators) throws IOException {
        this.jobID = jobID;
        this.executionAttemptID = executionAttemptID;
        this.flinkAccumulators = new SerializedValue<Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>(flinkAccumulators);
        this.userAccumulators = new SerializedValue<Map<String, Accumulator<?, ?>>>(userAccumulators);
    }

     

    最后,我们如何将统计数据累加到Accumulator上的?

    直接看看Flink内部的Accumulator是如何更新的,都是通过这个reporter来更新的

    /**
     * Accumulator based reporter for keeping track of internal metrics (e.g. bytes and records in/out)
     */
    private static class ReadWriteReporter implements Reporter {
    
        private LongCounter numRecordsIn = new LongCounter();
        private LongCounter numRecordsOut = new LongCounter();
        private LongCounter numBytesIn = new LongCounter();
        private LongCounter numBytesOut = new LongCounter();
    
        private ReadWriteReporter(Map<Metric, Accumulator<?,?>> accumulatorMap) {
            accumulatorMap.put(Metric.NUM_RECORDS_IN, numRecordsIn);
            accumulatorMap.put(Metric.NUM_RECORDS_OUT, numRecordsOut);
            accumulatorMap.put(Metric.NUM_BYTES_IN, numBytesIn);
            accumulatorMap.put(Metric.NUM_BYTES_OUT, numBytesOut);
        }
    
        @Override
        public void reportNumRecordsIn(long value) {
            numRecordsIn.add(value);
        }
    
        @Override
        public void reportNumRecordsOut(long value) {
            numRecordsOut.add(value);
        }
    
        @Override
        public void reportNumBytesIn(long value) {
            numBytesIn.add(value);
        }
    
        @Override
        public void reportNumBytesOut(long value) {
            numBytesOut.add(value);
        }
    }

     

    何处调用到这个report的接口,

    对于in, 在反序列化到record的时候会统计Bytesin和Recordsin

    AdaptiveSpanningRecordDeserializer
    public DeserializationResult getNextRecord(T target) throws IOException {
        // check if we can get a full length;
        if (nonSpanningRemaining >= 4) {
            int len = this.nonSpanningWrapper.readInt();
    
            if (reporter != null) {
                reporter.reportNumBytesIn(len);
            }
            
            if (len <= nonSpanningRemaining - 4) {
                // we can get a full record from here
                target.read(this.nonSpanningWrapper);
    
                if (reporter != null) {
                    reporter.reportNumRecordsIn(1);
                }

     

    所以对于out,反之则序列化的时候写入

    SpanningRecordSerializer
    @Override
    public SerializationResult addRecord(T record) throws IOException {
        int len = this.serializationBuffer.length();
        this.lengthBuffer.putInt(0, len);
    
        if (reporter != null) {
            reporter.reportNumBytesOut(len);
            reporter.reportNumRecordsOut(1);
        }

     

    使用accumulator时,需要首先extends RichFunction by callinggetRuntimeContext().addAccumulator

  • 相关阅读:
    View转化为bitmap
    Bitmap 与Drawable相互转换
    android studio 连接不到真机
    解决Android sync无法同步问题
    Stetho管理手机
    android sugar no such table
    android 建数据库的正确写法
    android JSON 数据解析
    android notification 理解
    android aidl 简单使用
  • 原文地址:https://www.cnblogs.com/fxjwind/p/5757640.html
Copyright © 2011-2022 走看看