zoukankan      html  css  js  c++  java
  • kafka消费端提交offset的方式

    Kafka 提供了 3 种提交 offset 的方式

    1. 自动提交
    1
    2
    3
    4
    // 自动提交,默认true
    props.put("enable.auto.commit", "true");
    // 设置自动每1s提交一次
    props.put("auto.commit.interval.ms", "1000");
    1. 手动同步提交 offset
    1
    consumer.commitSync();
    1. 手动异步提交 offset
    1
    consumer.commitAsync();

    上面说了既然异步提交 offset 可能会重复消费, 那么我使用同步提交是否就可以表明这个问题呢?

    1
    2
    3
    4
    5
    6
    7
    while(true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
    insertIntoDB(record);
    consumer.commitSync();
    });
    }

    很明显不行, 因为 insertIntoDB 和 commitSync() 做不到原子操作, 如果 insertIntoDB() 成功了,但是提交 offset 的时候 consumer 挂掉了,然后服务器重启,仍然会导致重复消费问题。

    如何做到不重复消费?

    只要保证处理消息和提交 offset 得操作是原子操作,就可以做到不重复消费。我们可以自己管理 committed offset, 而不让 kafka 来进行管理。

    比如如下使用方式:

    1. 如果消费的数据刚好需要存储在数据库,那么可以把 offset 也存在数据库,就可以就可以在一个事物中提交这两个结果,保证原子操作。
    2. 借助搜索引擎,把 offset 和数据一起放到索引里面,比如 Elasticsearch

    每条记录都有自己的 offset, 所以如果要管理自己的 offset 还得要做下面事情

    1. 设置 enable.auto.commit=false
    2. 使用每个 ConsumerRecord 提供的 offset 来保存消费的位置。
    3. 在重新启动时使用 seek(TopicPartition, long) 恢复上次消费的位置。

    通过上面的方式就可以在消费端实现”Exactly Once” 的语义, 即保证只消费一次。但是是否真的需要保证不重复消费呢?这个得看具体业务, 重复消费数据对整体有什么影响在来决定是否需要做到不重复消费。

  • 相关阅读:
    【CSDN博客之星评选】我为什么坚持写博客
    关于纯css布局的概况
    IIS服务器下301跳转是怎么样实现的?
    如何使用数据库保存session的方法简介
    PHP如何通过SQL语句将数据写入MySQL数据库呢?
    PHP中文函数顺序排列一数组且其序数不变
    angular实时显示checkbox被选中的元素
    oracle查询正在执行的语句以及正被锁的对象
    angular中ng-repeat去重
    接口自动化测试框架--http请求的get、post方法的实现
  • 原文地址:https://www.cnblogs.com/doit8791/p/11312979.html
Copyright © 2011-2022 走看看