zoukankan      html  css  js  c++  java
  • 【记录】【springboot】【kafka】【KafkaStreams】报错Use a different TimestampExtractor to process this data

    问题:springboot集成kafka,并由KafkaStreams处理,启动报错

    org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = crawler_events, partition = 0, offset = 0, CreateTime = -1, serialized key size = -1, serialized value size = 187, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {XXX}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

    解决:去掉@EnableKafkaStreams,加入bean,和相关类

    @Bean(
            name = {"defaultKafkaStreamsBuilder"}
    )
    public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(@Qualifier("defaultKafkaStreamsConfig") ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider, ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
        KafkaStreamsConfiguration streamsConfig = (KafkaStreamsConfiguration)streamsConfigProvider.getIfAvailable();
        if (streamsConfig != null) {
            Properties properties = streamsConfig.asProperties();
            properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class.getName());
            StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
            StreamsBuilderFactoryBeanCustomizer customizer = (StreamsBuilderFactoryBeanCustomizer)customizerProvider.getIfUnique();
            if (customizer != null) {
                customizer.configure(fb);
            }
    
            return fb;
        } else {
            throw new UnsatisfiedDependencyException(KafkaStreamsDefaultConfiguration.class.getName(), "defaultKafkaStreamsBuilder", "streamsConfig", "There is no 'defaultKafkaStreamsConfig' " + KafkaStreamsConfiguration.class.getName() + " bean in the application context.
    Consider declaring one or don't use @EnableKafkaStreams.");
        }
    }
    package com.bda.clientportait.strategy.common.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.streams.processor.TimestampExtractor;
    
    /**
     * TODO
     *
     * @author liwei
     * @version 1.0
     * @date 2021/3/15 15:25
     * @className com.bda.clientportait.strategy.common.kafka.MyEventTimeExtractor
     **/
    public class MyEventTimeExtractor implements TimestampExtractor {
        @Override
        public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
            return System.currentTimeMillis();
        }
    }

    参考文章:http://www.voidcn.com/article/p-fepwxwfx-bmq.html

                      https://blog.csdn.net/dcm19920115/article/details/93386750

  • 相关阅读:
    vi使用方法详细介绍
    Jenkins实现Android自动化打包
    JSON知识总结
    React Native中pointerEvent属性
    从零学React Native之06flexbox布局
    Android Http实现文件的上传和下载
    从零学React Native之05混合开发
    React Native声明属性和属性确认
    从零学React Native之04自定义对话框
    Android 在图片的指定位置添加标记
  • 原文地址:https://www.cnblogs.com/xiaostudy/p/14538223.html
Copyright © 2011-2022 走看看