zoukankan      html  css  js  c++  java
  • KAFKA基础(四):Kafka架构深入(2)Kafka 生产者

    1 分区策略

    1)分区的原因
    (1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
    (2)可以提高并发,因为可以以 Partition 为单位读写了。
    2)分区的原则
    我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
    (1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
    (2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值;
    (3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition值,也就是常说的 round-robin 算法。

    2 数据可靠性保证

    为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
    1)副本数据同步策略
    Kafka 选择了第二种方案,原因如下:
    1.同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
    2.虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。 
    2)ISR
      采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去,
    直到它完成同步,才能发送 ack。这个问题怎么解决呢?
       Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower长时间 未 向 leader 同 步 数 据 , 则 该 follower 将 被 踢 出 ISR , 该 时 间 阈 值 由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
    3)ack 应答机制
      对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。
      所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡,选择以下的配置。 
    acks 参数配置:
    acks:
      0:producer 不等待 broker 的 ack,这一操作提供了一个最低的延迟,broker 一接收到还没有写入磁盘就已经返回,当 broker 故障时有可能丢失数据
      1:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,如果在 follower同步成功之前 leader 故障,那么将会丢失数据
    acks = 1数据丢失案例 
     
      -1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盘成功后才返回 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会
    造成数据重复。 
    acks =-1数据重复案例 
    4)故障处理细节
    Log文件中的HW和LEO
    LEO:指的是每个副本最大的 offset;
    HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO。 
    (1)follower 故障
    follower 发生故障后会被临时踢出 ISR,待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。
    等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
    (2)leader 故障
    leader 发生故障之后,会从 ISR 中选出一个新的 leader,之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader
    同步数据。
      注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 

    3 Exactly Once 语义

      将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被
    发送一次,即 At Most Once 语义。 
      At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Least Once可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说
    交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局
    去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。
      0.11 版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。幂等性结合 At Least Once 语
    义,就构成了 Kafka 的 Exactly Once 语义。即: 
    At Least Once + 幂等性 = Exactly Once 
     
      要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。Kafka的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。开启幂等性的 Producer 在
    初始化的时候会被分配一个 PID,发往同一 Partition 的消息会附带 Sequence Number。而Broker 端会对<PID, Partition, SeqNumber>做缓存,当具有相同主键的消息提交时,Broker 只
    会持久化一条。
      但是 PID 重启就会变化,同时不同的 Partition 也具有不同主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。
     
     

    重要参数配置

    acks

    这个参数用老指定分区中必须由多少个副本收到消息,之后生产者才会认为这条消息写入是成功的。acks参数有三种类型的值(都是字符串类型)。

    • acks=1 默认值为1.生产者发送消息之后,只要分区的leader副本成功的写入消息,生产端就会收到来自服务端的成功响应,说明发送成功。如果消息无法写入leader副本,比如在leader副本崩溃、重新选举新的leader副本的过程中,生产者就会收到一个错误的响应,为了避免消息丢失,生产者就会选择重发消息;如果消息写入leader副本并成功响应给生产者,并且在其他follower副本拉取之前leader副本崩溃,此时消息还会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的这种方案。
    • acks=0 生产者发送消息之后,不需要等待任何服务端的响应。如果在消息从发送到写入kafka的过程中出现异常,导致kafka并没有收到消息,此时生产者是不知道的,消息也就丢失了。akcs设置为0时,kafka可以达到最大的吞吐量。
    • acks=-1或acks=all 生产者在消息发送之后,需要等待isr中所有的副本都成功写入消息此案能够收到服务端的成功响应。acks设置为-1,可以达到相对最强的可靠性。但这不一定是最可靠的,因为isr中可能就只有leader副本,这样就退化成了acks=1 的情况。

    注意,acks参数是一个字符串类型,而不是一个整数类型。配置错误会报异常。

    max.request.size

    生产者客户端能发送消息的最大值,默认值为1048576B,1MB。不建议盲目修改,这个参数涉及其他的一些参数的联动,比如broker端的message.max.bytes参数,如果broker的message.max.bytes参数设置为10,而max.request.size设置为20,当发送一条大小为15B的消息时,生产者参数就会报错。

    retries和retry.backoff.ms

    生产者重试次数,默认值为0。消息在从生产者从发出到成功写入broker之前可能发生一些临时性异常,比如网络抖动、leader副本选举等,这些异常往往是可以自行恢复的,生产者可以配置retries的值,通过生产端的内部重试来恢复而不是一味的将异常抛给生产者;如果重试达到设定次数,生产者才会放弃重试并抛出异常。但是!并不是所有的异常都可以通过重试来解决,比如消息过大,超过max.request.size参数配置的数值。

    重试还和参数retry.backoff.ms有关,默认值为100,用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置retries和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间要大于异常恢复时间,避免生产者过早的放弃重试。

    connections.max.idele.ms

    这个参数用来制动多久之后关闭限制的连接,默认值540000(ms),9分钟、。

    linger.ms

    这个参数用来指定生产者发送ProducerBatch之前等待更多的消息(ProducerRecord)假如ProducerBatch的时间,默认值为0。ProducerBatch在被填满或者时间超过linger.ms值时发送出去。增大这个参数的值回增加消息的延迟(消费端接收延迟),但能够提升一定的吞吐量。

    receive.buffer.bytes

    这个参数用来设置socket接收消息缓冲区的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer和Kafka处于不同的机房,则可以适当的调大这个参数值。

    send.buffer.bytes

    这个参数用来设置socket发送消息缓冲区的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

    request.timeout.ms

    这个参数用来配置Producer等待请求响应的最长时间,默认值为3000(ms)。请求超时之后可以选择进行重试。这个参数需要比broker端参数replica.lag.time.max.ms值要大,这样可以介绍因客户端重试引起的消息重复的概率。

    enable.idempotence

    幂等性开启,默认为false。

    bootstrap.servers

    broker集群地址,可以设置一到多个,建议至少设置为2个,若在应用程序启动的时候,一个broker节点宕机,还可以对另一个已提供节点进行连接。

     
     
     

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13394217.html

  • 相关阅读:
    一个java点菜程序
    团队成员
    CentOS tomcat普通用户执行配置安装
    varnish的监控和调试
    组播协议和组播路由
    AVAYA加外线DID,30B+D
    vi技巧
    CentOS开启Telnet服务
    juniper 虚拟路由
    DOS批处理下操作telnet实现自动远程登录操作
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13394217.html
Copyright © 2011-2022 走看看