zoukankan      html  css  js  c++  java
  • Kafka学习笔记3--Kafka的生产者和消费者配置

    下载解压 kafka 后,在 kafka/config 下有 3 个配置文件与主题及其生产、消费相关。
    • server.properties--服务端配置
    • producer.properties--生产端配置
    • consumer.properties--消费端配置
    这里主要介绍生产者和消费者的配置。
     
    一、生产者配置
    producer.properties
    #指定连接 Kafka 集群所需的 broker 地址清单
    bootstrap.servers=localhost:9092
     
    #producer 用于压缩数据的压缩类型,压缩类型有none、gzip、snappy,默认是无压缩。
    #压缩最好用于批量处理,批量处理消息越多,压缩性能越好。
    #消息被压缩后发送到broker集群,broker集群是不会进行解压缩的,只会把消息发送到消费者集群,然后由消费者来解压缩。
    compression.type=none
     
    #生产者生产的消息被发送到哪个block,需要一个分组策略。
    #分区处理类,默认partitioner基于 key 的 hash 表。
    #partitioner.class=
     
    # 在向 producer 发送 ack 之前,broker允许等待的最大时间,否则会发送错误到客户端。
    #request.timeout.ms=
     
    # 控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。
    #max.block.ms=
     
    #当消息到达的速度比发送速度快,会出现生产者组将发送的消息组合成单个批量请求的现象,或者客户端希望减少请求数量,
    #设置此参数,生产者将等待一个延迟,以便和其他的消息组合成一个批次发出,减少发送的请求数。
    #linger.ms=
     
    #请求的最大字节数。
    #max.request.size=
     
    #生产者批处理消息的字节数,以减少请求次数,这将改善client与server之间的性能。
    #batch.size=
     
    # 生产者可以用来缓存消息的缓冲区大小。
    #buffer.memory=

    在网上看到生产端有个 acks 参数,但这个配置文件没有,先在 Kafka 的官网手册了解了一下,留待后面深入了解。

    acks 这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 参数有 3 个值,为 1、0、-1:
    • acks = 1,默认值:
        生产者发送消息之后,只要分区的 leader 副本成功写入消息,就会收到来自服务端的成功响应。
        如果消息因为leader 副本崩溃等原因无法写入 leader 副本,生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。
        为了保证消息不丢失,至少要设置为1,也就是至少保证 leader 将消息保存成功。
    • acks = 0:
        生产者发送消息之后不需要等待任何服务端的响应。
        如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,生产者也无从得知,消息也就丢失了。
        在其他配置环境相同的情况下,acks 设置为 0 可以达到最大吞吐量。
    • acks = -1:
        生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应,也就是不仅是主的分区将消息保存成功了,而且其所有的分区的副本数也都同步好了,才会被认为发动成功。
        在其他配置环境相同的情况下,acks 设置为 -1 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR 中可能只有 leader 副本,这样就退化成了 acks=1 的情况。 
     
    更多配置可以参见 Producer 配置
     
    二、消费者配置
    consumer.properties  
    #用于建立初始连接到kafka集群的"主机/端口对"配置列表。
    bootstrap.servers=localhost:9092
    #消费者所在消费组的唯一标识
    group.id=test-consumer-group
    ## What to do when there is no initial offset in Kafka or if the current
    # offset does not exist any more on the server: latest, earliest, none
    #当Kafka没有初始偏移量或服务器不再有当前的偏移量怎么办?
    #latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
    #earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    #none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
    auto.offset.reset=

    该配置文件中的参数不多,更多配置可以参见 Consumer 配置。 

  • 相关阅读:
    Struts2 采用Convention-plugin 实现零配置
    xml中批量新增数据或者批量修改数据
    Linux基础笔记_05
    Linux基础笔记_03
    Linux基础笔记_02
    Linux基础笔记_01
    玩转spring boot——websocket
    获取一天中的起始时间与结束时间
    《浅谈架构之路:前后端分离模式》
    区域语言与区域语言包对照关系表
  • 原文地址:https://www.cnblogs.com/sunshineliulu/p/12013723.html
Copyright © 2011-2022 走看看