# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see org.apache.kafka.clients.producer.ProducerConfig for more details ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... #bootstrap.servers=localhost:9092 # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd #compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # the maximum amount of time the client will wait for the response of a request #request.timeout.ms= # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for #max.block.ms= # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together #linger.ms= # the maximum size of a request in bytes #max.request.size= # the default batch size in bytes when batching multiple records sent to a partition #batch.size= # the total bytes of memory the producer can use to buffer records waiting to be sent to the server #buffer.memory= #指定kafka节点列表,用于获取metadata,不必全部指定 metadata.broker.list=192.168.142.145:9092,192.168.142.146:9092,192.168.142.147:9092 #指定分区处理类。默认kafka.producer.DefaultPartitioner,表示通过key哈希到对应分区 #partitioner.class=kafka.producer.DefaultPartitioner #是否压缩,默认0表示不压缩,1表示gzip压缩,2表示snappy压缩,压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。 compression.codec=none #指定序列化处理类 serializer.class=kafka.serializer.DefaultEncoder #如果要压缩消息,这里指定那些topic要压缩消息,默认empty,表示不压缩。 #compressed.topics= #设置发送数据是否需要服务端的反馈,有三个值0,1,-1 #0:producer不会等待broker发送ack #1:当leader接收到消息之后发送ack #-1:当所有的follower都同步消息成功后发送ack request.required.acks=0 #在向producer发送ack之前,broker允许等待的最大时间,如果超时,broker将会向producer发送一个error ack,意味着上一次消息因为某种原因未能成功(比如followers未能同步成功) request.timeout.ms=10000 #同步还是异步发送消息,默认"sync"表同步,"async"表异步,异步可以提高发送吞吐量,也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息 producer.type=sync #在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms #此值和batch.num.messages协同工作 queue.buffering.max.ms=5000 #在async模式下,producer端允许buffer的最大消息量 #无论如何,producer都无法尽快的将消息发送给broker,从而导致在producer端大量沉积 #此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000 queue.buffering.max.messages=20000 #如果是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 #当消息在producer端沉积的条数达到"queue.buffering.max.messages"后 #阻塞一定时间后,队列任然没有enqueue(producer任然没有发送任何消息) #此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 #-1:无阻塞超时限制,消息不会被抛弃 #0:立即清空队列,消息被抛弃 queue.enqueue.timeout.ms=-1