zoukankan      html  css  js  c++  java
  • Kafka 如何保证消息的消费顺序一致性

    Kafka 如何保证消息的消费顺序?

    在Kafka中Partition(分区)是真正保存消息的地方,发送的消息都存放在这里。Partition(分区)又存在于Topic(主题)中,并且一个Topic(主题)可以指定多个Partition(分区)。

    在Kafka中,只保证Partition(分区)内有序,不保证Topic所有分区都是有序的。

    所以 Kafka 要保证消息的消费顺序,可以有2种方法:

    一、1个Topic(主题)只创建1个Partition(分区),这样生产者的所有数据都发送到了一个Partition(分区),保证了消息的消费顺序。

    二、生产者在发送消息的时候指定要发送到哪个Partition(分区)。

    我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
    
    (1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
    
    (2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值; 
    
     在Producer往Kafka插入数据时,控制同一Key分发到同一Partition,并且设置参数max.in.flight.requests.per.connection=1,也即同一个链接只能发送一条消息,如此便可严格保证Kafka消息的顺序
    
    (3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
    
    面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
    
    值,也就是常说的 round-robin 算法。

    三、以下所有的分析都是基于同一个partition下的场景细化,
    多partition下无法保障消息的顺序性,单一partition若碰到如下场景还是需要调整参数。

    场景一:设置了retries>0,并且max.in.flight.requests.per.connection>1

    1、retries

    生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries参数的值决定了生产者可以重发消息的次数,
    如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。

    2、max.in.flight.requests.per.connection

    该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
    把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。这种场景下无法保障单一partition的有序,一般来说要保障消息的有序性,对于消息的可靠性也是有要求的,
    所以一般retries可以设置为大于0,但是max.in.flight.requests.per.connection设置为1即可,不过这样就有一个问题,导致了消息的吞吐量大大降低。


    场景二:需要提升吞吐量max.in.flight.requests.per.connection设置大于1

    此场景下业务要保障消息的吞吐量,那么max.in.flight.requests.per.connection必然就会选择更大的一个阈值,但是此场景还能保障消息有序性吗?答案是肯定的,
    可以设置enable.idempotence=true,开启生产者的幂等生产,可以解决顺序性问题,并且允许max.in.flight.requests.per.connection设置大于1

    四、参考:Kafka如何保证单partition有序?

    1.producer发消息到队列时,通过加锁保证有序现在假设两个问题broker leader在给producer发送ack时,因网络原因超时,那么Producer将重试,造成消息重复。

    先后两条消息发送。t1时刻msg1发送失败,msg2发送成功,t2时刻msg1重试后发送成功。造成乱序。

    2.解决重试机制引起的消息乱序为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
    对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。
    同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号)大一,
    则Broker会接受它,否则将其丢弃:如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,
    Producer抛出InvalidSequenceNumber如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,
    Producer抛出DuplicateSequenceNumberSender发送失败后会重试,这样可以保证每个消息都被发送到broker

    参考:

    https://www.zhihu.com/question/266390197

    https://zhuanlan.zhihu.com/p/262156222

  • 相关阅读:
    Awesome Ubuntu中文
    OpenCV 2.4.0 + IPP + TBB, checked
    (MSys+MinGW )FFmpeg工程编译 FFplay Gary's Blog A C++ programmer 博客频道 CSDN.NET
    OpenCV installation for Ubuntu 12.04 | Raben Systems, Inc.
    安装Ubuntu 12.04后没有休眠(hibernate)选项
    Entity Framework 4.1
    三行代码实现阿拉伯数字转中文大小写<转>
    DataGridViewCell赋值
    数据库事务嵌套用法
    WCF传递DataTable时需要填写表名
  • 原文地址:https://www.cnblogs.com/-courage/p/15252760.html
Copyright © 2011-2022 走看看