zoukankan      html  css  js  c++  java
  • Flink kafka connector 端到端精确一次测试

    官网博客中: Apache Flink中的端到端精确一次处理概述  对Flink 端到端精确一次处理和两段提交的原理,有详尽的描述

    这里要写的是,关于 Flink  kafka  端到端精确一次的测试

    之前就大概测试过相应内容,应该是测试失败了的,只得到了至少一次的结果(之前的关注点不在这个上面,下面会说明为什么只得到 至少一次 的结果)。

    这一次是要做Flink HA 相关的配置,有个重要的点就是任务在异常恢复的时候,是否能保持精确一次,这个关乎线上的数据和我们代码的写法(如果Flink 不能保证精确一次,就需要在代码里添加对应的内容)。

    测试的前提当然是,开启了 checkpoint,也设置了checkpoint mode 设为精确一次:

    val rock = new RocksDBStateBackend(Common.CHECK_POINT_DATA_DIR)
    env.setStateBackend(rock.getCheckpointBackend)
    // checkpoint interval 10 minute
    env.enableCheckpointing(2 * 60 * 1000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    同时 Kafka 的生产者也必须开启精确一次的语义: FlinkKafkaProducer 没有过期的公有构造方法,都需要制定 Kafka 生产者的一致性语义:

    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) 
    public FlinkKafkaProducer(String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) 

    特别吐槽下,1.10 版本 KafkaSerializationSchema 没有提供对应的实现类,让我这种菜鸟很尴尬

    看下我的生产者写法(很挫)

    Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")
        val sink = new FlinkKafkaProducer[String](topic+"_out" // 没用了
          , new MyKafkaSerializationSchema[String](topic + "_out") // 指到这里了
          , Common.getProp
          , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    
    

    MyKafkaSerializationSchema 的实现如下:

    public class MyKafkaSerializationSchema<T>
            implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
        private String topic;
    
        public MyKafkaSerializationSchema(String topic) {
            this.topic = topic;
        }
    
        @Override
        public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
    
            return new ProducerRecord<>(topic, element.toString().getBytes());
        }
    
        @Override
        public String getTargetTopic(T element) {
            return null;
        }
    }

    由于ProducerRecord 必须要指定 topic,但是又获取不到 FlinkKafkaProducer 中指定的topic,就先这样写了

    测试代码就简单了:

    val topic = "simple_string_kafka"
    val source = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), Common.getProp)
    
    # 开启精确一次语义会报错,必须在kakfa 的 prop 中指定事务过期的时间
    Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")
    val sink = new FlinkKafkaProducer[String](topic+"_out" // 没用
      , new MyKafkaSerializationSchema[String](topic + "_out") // 指到这里了
      , Common.getProp
      , FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    
    env.addSource(source)
      .name("source")
      .disableChaining()
      .map(str => {
        str
      })
      .disableChaining()
      .name("map1")
      .map(s => {
        s
      })
      .addSink(sink)
      .name("sink")

    代码就这样的,然后就是打包上传服务器。

    跑起来就这样的了:

     往 source 发送的数据如下:

     只有id和create_time 两个字段,id 是个随机的字符串,create_time 是当前的时间戳(每秒一条数据)

    接收到 sink  topic 的数据如下:

     这里的时候,其实是有个疑问的,source 接收到一条数据,直接就写到了sink,我也消费出来了,怎么保证精确一次呢?

    kafka 在 0.11 版本之后,增加了对事物的支持,这就是 kafka 端到端一致性的关键。

    不然来一条数据,直接就写出去了,下游也消费了,这个时候异常恢复,source是后退了,sink 却没办法后退了。

    关键就这 kakfa properties 的这个参数:

    // default read_uncommitted
    prop.put("isolation.level", "read_committed");

    kafka 消费者的隔离级别,默认是  read_uncommitted (读未提交),意思是有消息就读,不管是否提交。

    在消费 sink topic 的消费者中添加这个配置,就可以看到,消息不是有一条就输出一条,而是在完成一个checkpoint 后才会输出一批数据,就对应一个checkpoint 时间,flink 处理过的数据,就这样做到 端到端的一致性。

    这样Flink 在正常执行的时候,source 消费一条数据,处理后,直接写到sink(也就是kafka),在做checkpoint 的时候,再提交事务,如果这个过程中异常恢复了,source 的offset 直接从 checkpoint 中获取,而之前已经消费过的数据,虽然已经写到kafka,但是没有提交,就这样做到了端到端的一致性。

    在 FlinkKafkaProducer 中添加精确一次语义,会到一个异常:

     在生产者的 prop 中添加如下配置即可:

    Common.getProp.setProperty("transaction.timeout.ms", 1000*60*2+"")

    配置合理的超时时间,不能大于kafka 的: transaction.max.timeout.ms 配置(默认 900 秒)

    搞定

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    Windows使用SCHTASKS 命令执行定时任务
    window10设置定时任务
    uiautomator2+python自动化测试1-环境准备
    uiautomator2+python自动化测试2-查看app页面元素利器weditor
    APPIUM 自带的webdriveragent
    使用 mitmproxy + python 做拦截代理
    mitmproxy 实战
    深入学习mitmproxy
    将博客搬至CSDN
    CS231N Assignment5 图像分类练习
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/12554040.html
Copyright © 2011-2022 走看看