zoukankan      html  css  js  c++  java
  • KafkaStream时间戳问题CreateTime = -1引起的程序中断

    Exception in thread "app-8835188a-e0a0-46da-ac2a-6820ec197628-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = raw_103, partition = 1, offset = 7032668, CreateTime = -1, serialized key size = -1, serialized value size = 111, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "key1": [ 103, "4113471085724846255", "--", "2018-04-17 21:33:53" ], "key2": [ [ 213309, "--", 20128, 1 ] ] }) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
            at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:73)
            at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61)
            at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:48)
            at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:98)
            at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
            at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:560)
            at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:896)
            at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:797)
            at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
            at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
    
    

    之前直接改了源码。后来从度娘中找到解决方法:
    新增时间异常捕获类MyEventTimeExtractor.class, 直接返回0

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.streams.processor.TimestampExtractor;
    
    public class MyEventTimeExtractor implements TimestampExtractor{
    	@Override
    	public long extract(ConsumerRecord<Object, Object> record,
    			long previousTimestamp) {
    		return 0;
    	}
    }
    

    然后在属性添加下面配置:

    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);
    

    编译执行,ok

  • 相关阅读:
    vue-cli
    respond.js
    dataTable调用接口渲染数据,没有数据,报错
    jq自定义鼠标右键菜单
    datatables通过ajax调用渲染数据,怎么根据数据给td添加class
    【C++ Primer 第11章 练习答案】2. 关联容器概述
    【Leetcode】1. Two Sum
    【C++】拷贝构造函数(深拷贝,浅拷贝)详解
    【图的遍历】广度优先遍历(DFS)、深度优先遍历(BFS)及其应用
    【C++ Primer 第十三章】4. 拷贝控制示例
  • 原文地址:https://www.cnblogs.com/30go/p/8876877.html
Copyright © 2011-2022 走看看