zoukankan      html  css  js  c++  java
  • kafka原理和实践(五)spring-kafka配置详解

    系列目录

    kafka原理和实践(一)原理:10分钟入门

    kafka原理和实践(二)spring-kafka简单实践

    kafka原理和实践(三)spring-kafka生产者源码

    kafka原理和实践(四)spring-kafka消费者源码

    kafka原理和实践(五)spring-kafka配置详解

    kafka原理和实践(六)总结升华

    一、官方配置

    官方配置文档飞机票建议看Importance=medium以上的,即重要性为中级以上的,其他的用到了再说。

    二、实践中的配置

    properties配置如下:

    bootstrap.servers=192.168.49.206:9092,192.168.49.205:9092,192.168.49.204:9092 brokers集群
    acks=all     即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失.        
    retries=10 发送失败重试次数
    batch.size=1638 批处理条数:当多个记录被发送到同一个分区时,生产者会尝试将记录合并到更少的请求中。这有助于客户端和服务器的性能。
    linger.ms=1 批处理延迟时间上限:即1ms过后,不管是否达到批处理数,都直接发送一次请求
    buffer.memory=33554432 即32MB的批处理缓冲区

    group.id=order-beta  消费者群组ID,发布-订阅模式,即如果一个生产者,多个消费者都要消费,那么需要定义自己的群组,同一群组内的消费者只有一个能消费到消息
    enable.auto.commit=true 如果为true,消费者的偏移量将在后台定期提交。
    auto.commit.interval.ms=1000 如何设置为自动提交(enable.auto.commit=true),这里设置自动提交周期
    session.timeout.ms=15000 在使用Kafka的组管理时,用于检测消费者故障的超时
    concurrency = 3 消费监听器容器并发数

    1、生产者配置

    具体对应第二章中xml配置:

     1 <bean id="producerProperties" class="java.util.HashMap">
     2         <constructor-arg>
     3             <map>
     4                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
     6                 <entry key="retries" value="${retries}" />
     7                 <entry key="batch.size" value="${batch.size}" />
     8                 <entry key="linger.ms" value="${linger.ms}" />
     9                 <entry key="buffer.memory" value="${buffer.memory}" />
    11                 <entry key="acks" value="${acks}" />   
    13                 <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />源码预制的UTF8字符串反序列化实现类  byte[]-》String
    15                 <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
    17             </map>
    18         </constructor-arg>
    19     </bean>

    2、消费者配置

    具体对应第二章中xml配置:

     1 <!-- 定义consumer的参数 -->
     2     <bean id="consumerProperties" class="java.util.HashMap">
     3         <constructor-arg>
     4             <map>
     5                 <entry key="bootstrap.servers" value="${bootstrap.servers}" />
     6                 <entry key="group.id" value="${group.id}" />
     7                 <entry key="enable.auto.commit" value="${enable.auto.commit}" />
     8                 <entry key="session.timeout.ms" value="${session.timeout.ms}" />
     9                 <entry key="key.deserializer"
    10                     value="org.apache.kafka.common.serialization.StringDeserializer" />
    11                 <entry key="value.deserializer"
    12                     value="org.apache.kafka.common.serialization.StringDeserializer" />
    13             </map>
    14         </constructor-arg>
    15     </bean>
    1 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
    2         <constructor-arg ref="consumerFactory" />
    4         <property name="concurrency" value="${concurrency}" />消费监听器容器并发数
    5 </bean>

    3. 使用规范

    这里发布一个真实的公司要求的使用规范,当然比较简单哈,但贵在真实:

            a: Producer 部分参数设定:

             1: acks 设置为 "all" 即所有副本都同步到数据时send方法才返回, 以此来完全判断数据是否发送成功, 理论上来讲数据不会丢失.           

        2: retries = MAX 无限重试,直到你意识到出现了问题.

        3: 使用 callback 来处理消息失败发送逻辑.

        4: min.insync.replicas > 1 消息至少要被写入到这么多副本才算成功,也是提升数据持久性的一个参数。与acks配合使用.

        5: 其他一些超时参数: reconnect.backoff.ms, retry.backoff.ms , linger.ms 结合 batch.size 等.    

           

           

            b: Consumer 部分参数设定:

                1: auto.offset.reset 设置为 "earliest" 避免 offset 丢失时跳过未消费的消息. 目前消息存储不统一, 部分使用 zookeeper, 部分使用 kafka topic.          

                2: enable.auto.commit=false  关闭自动提交位移, 在消息被完整处理之后再手动提交位移.

                3: consumer 的并发受 partition 的限制. 如果消息处理量比较大的情况请提前与运维联系, 增加 partition 数量应对消费端并发. 默认topic partition 为6-8个.

                   partition 也不是越多越好. 首先会增加 file 和 memory, 其次会延长选举时间, 并且会延长 offset 的查询时间.  partition可以扩容但无法缩减.

           

           

        极限情况的数据丢失现象.

            a: 即使将 ack 设置为 "all" 也会在一定情况下丢失消息. 因为 kafka 的高性能特性, 消息在写入 kafka 时并没有落盘 而是写入了 OS buffer 中. 使用 OS 的脏页刷新策略周期性落盘, 就算落盘 仍然会有 raid buffer. 前者机器宕机数据丢失, 后者机器跳电数据丢失.

            b: 对数据可靠性较高的场景建议 offset 手动提交. 自动提交当遇到业务系统上线被关闭时, 消息读取并且 offset 已经提交, 但是数据没有存储或者仍没来得及消费时, 消息状态在内存中无法保留, 重启应用会跳过消息 致使消息丢失.

  • 相关阅读:
    [archlinux][plasma][screensaver] plasma5配置屏保程序,没成功(-_-#)
    [skill][https][ssl/tls] HTTPS相关知识汇总
    [dpdk][kernel][driver] 如何让DPDK的UIO开机自动加载到正确的网卡上
    [archlinux] linux boot process/order/stage
    [potatos][flex][TBC] 语义分析词法分析 flex
    [daily][tcpdump][bpf] 如何用tcpdump抓到一个分片包
    [daily][dpdk] 网卡offload识别包类型;如何模拟环境构造一个vlan包
    [skill][c] *(char**)
    [apr] Apache Portable Runtime
    [skill] mmap / fwrite / write linux磁盘读写的分层结构
  • 原文地址:https://www.cnblogs.com/dennyzhangdd/p/7834143.html
Copyright © 2011-2022 走看看