zoukankan      html  css  js  c++  java
  • Flink反序列化失败metric

    kafka作为source会用KafkaFetcher去拉取数据核心代码:

    @Override
        public void runFetchLoop() throws Exception {
            try {
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }

    可以通过 MetricGroup consumerMetricGroup 添加Counter

    private final Counter counter;
    
    //构造方法添加
    this.counter = consumerMetricGroup.counter("dirtyDataNum");
    
    //捕获反序列化异常
    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    try {
    deserializer.deserialize(record, kafkaCollector);
    } catch (Exception e) {
    this.counter.inc();
    LOG.info("deserialize exception");
    }


    // emit the actual records. this also updates offset state atomically and emits
    // watermarks
    emitRecordsWithTimestamps(
    kafkaCollector.getRecords(),
    partition,
    record.offset(),
    record.timestamp());

    if (kafkaCollector.isEndOfStreamSignalled()) {
    // end of stream signaled
    running = false;
    break;
    }
    }
     

    获取 Metrics(每个子任务有一个指标) :flink_taskmanager_job_task_operator_KafkaConsumer_dirtyDataNum

  • 相关阅读:
    Visual Studio 2008 每日提示(四)
    修改XP注册到用户名和公司组织名
    Visual Studio技巧之打造拥有自己标识的代码模板
    收集的学习资料
    多个记录更新(存储过程)
    '1,2,3,68,10'转换为'1,2,3,6,7,8,10'
    .NET程序员面试的题一部 (转)
    [.net]DataGrid中绑定DropDownList[转]
    使用DELETE与TRUNCATE删除表所有行的区别
    sysobjects 各列的含义
  • 原文地址:https://www.cnblogs.com/vip-nange/p/14182155.html
Copyright © 2011-2022 走看看