zoukankan      html  css  js  c++  java
  • Flink反序列化失败metric

    kafka作为source会用KafkaFetcher去拉取数据核心代码:

    @Override
        public void runFetchLoop() throws Exception {
            try {
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }

    可以通过 MetricGroup consumerMetricGroup 添加Counter

    private final Counter counter;
    
    //构造方法添加
    this.counter = consumerMetricGroup.counter("dirtyDataNum");
    
    //捕获反序列化异常
    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    try {
    deserializer.deserialize(record, kafkaCollector);
    } catch (Exception e) {
    this.counter.inc();
    LOG.info("deserialize exception");
    }


    // emit the actual records. this also updates offset state atomically and emits
    // watermarks
    emitRecordsWithTimestamps(
    kafkaCollector.getRecords(),
    partition,
    record.offset(),
    record.timestamp());

    if (kafkaCollector.isEndOfStreamSignalled()) {
    // end of stream signaled
    running = false;
    break;
    }
    }
     

    获取 Metrics(每个子任务有一个指标) :flink_taskmanager_job_task_operator_KafkaConsumer_dirtyDataNum

  • 相关阅读:
    openJudge计算概论-谁考了第k名
    OpenJudge计算概论-求平均年龄
    OpenJudge计算概论-能被3,5,7整除的数
    OpenJudge计算概论-计算书费
    OpenJudge计算概论-计算三角形面积【海伦公式】
    OpenWrt 中安装配置Transmission
    OpenWrt中wifidog的配置及各节点页面参数
    Linux中后台执行任务
    通过ionice和nice降低shell脚本运行的优先级
    OpenWrt中对USB文件系统的操作, 以及读写性能测试
  • 原文地址:https://www.cnblogs.com/vip-nange/p/14182155.html
Copyright © 2011-2022 走看看