zoukankan      html  css  js  c++  java
  • Flink反序列化失败metric

    kafka作为source会用KafkaFetcher去拉取数据核心代码:

    @Override
        public void runFetchLoop() throws Exception {
            try {
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }

    可以通过 MetricGroup consumerMetricGroup 添加Counter

    private final Counter counter;
    
    //构造方法添加
    this.counter = consumerMetricGroup.counter("dirtyDataNum");
    
    //捕获反序列化异常
    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    try {
    deserializer.deserialize(record, kafkaCollector);
    } catch (Exception e) {
    this.counter.inc();
    LOG.info("deserialize exception");
    }


    // emit the actual records. this also updates offset state atomically and emits
    // watermarks
    emitRecordsWithTimestamps(
    kafkaCollector.getRecords(),
    partition,
    record.offset(),
    record.timestamp());

    if (kafkaCollector.isEndOfStreamSignalled()) {
    // end of stream signaled
    running = false;
    break;
    }
    }
     

    获取 Metrics(每个子任务有一个指标) :flink_taskmanager_job_task_operator_KafkaConsumer_dirtyDataNum

  • 相关阅读:
    C语言中 指针和数组
    C语言中 指针与结构体
    void指针、NULL指针和未初始化指针
    C语言中 指针、引用和取值

    别--------
    快速开发 jQuery 插件的 10 大技巧(转)
    采用预取(Prefetch)来加速你的网站(转)
    HttpWatch工具简介及使用技巧(转)
    iScroll框架的使用和修改
  • 原文地址:https://www.cnblogs.com/vip-nange/p/14182155.html
Copyright © 2011-2022 走看看