zoukankan      html  css  js  c++  java
  • Flink反序列化失败metric

    kafka作为source会用KafkaFetcher去拉取数据核心代码:

    @Override
        public void runFetchLoop() throws Exception {
            try {
                // kick off the actual Kafka consumer
                consumerThread.start();
    
                while (running) {
                    // this blocks until we get the next records
                    // it automatically re-throws exceptions encountered in the consumer thread
                    final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
    
                    // get the records for each topic partition
                    for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
    
                        List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());
    
                        partitionConsumerRecordsHandler(partitionRecords, partition);
                    }
                }
            }
            finally {
                // this signals the consumer thread that no more work is to be done
                consumerThread.shutdown();
            }
    
            // on a clean exit, wait for the runner thread
            try {
                consumerThread.join();
            }
            catch (InterruptedException e) {
                // may be the result of a wake-up interruption after an exception.
                // we ignore this here and only restore the interruption state
                Thread.currentThread().interrupt();
            }
        }

    可以通过 MetricGroup consumerMetricGroup 添加Counter

    private final Counter counter;
    
    //构造方法添加
    this.counter = consumerMetricGroup.counter("dirtyDataNum");
    
    //捕获反序列化异常
    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
    try {
    deserializer.deserialize(record, kafkaCollector);
    } catch (Exception e) {
    this.counter.inc();
    LOG.info("deserialize exception");
    }


    // emit the actual records. this also updates offset state atomically and emits
    // watermarks
    emitRecordsWithTimestamps(
    kafkaCollector.getRecords(),
    partition,
    record.offset(),
    record.timestamp());

    if (kafkaCollector.isEndOfStreamSignalled()) {
    // end of stream signaled
    running = false;
    break;
    }
    }
     

    获取 Metrics(每个子任务有一个指标) :flink_taskmanager_job_task_operator_KafkaConsumer_dirtyDataNum

  • 相关阅读:
    persistence_timeout ,域名请求登录后一操作即被踢出,,KeepAlive,lvs
    记录因xen而导致lvs,realserver转发activeconn为0
    html5各种页面切换效果和模态对话框
    [设计模式] javascript 之 抽象工厂模式
    jQuery源码分析-构造函数详解
    CSS3动画的回调处理
    jQuery load()方法用法集锦!
    css控制input标签
    分享22款响应式的 jQuery 图片滑块插件
    Jquery取得iframe中元素的几种方法(转载)
  • 原文地址:https://www.cnblogs.com/vip-nange/p/14182155.html
Copyright © 2011-2022 走看看