一、生产者配置
属性 | 描述 | 类型 | 默认值 | 重要性 |
bootstrap.servers |
用于建立与kafka集群的连接,这个list仅仅影响用于初始化的hosts,来发现全部的servers。 格式:host1:port1,host2:port2,…,数量尽量不止一个,以防其中一个down了 |
list | "" | 高 |
acks |
procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
这是最强有力的保证,这相当于acks = -1的设置。
|
string | 1 | 高 |
retries |
发送失败重试次数。 设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。 允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 |
string | 1 | 高 |
retry.backoff.ms |
发送失败,每次重试的间隔毫秒数。 | long | 100 | 低 |
buffer.memory |
producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。 这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。 一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 |
long |
33554432 (32M) |
高 |
batch.size |
批处理大小。
每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求, |
int | 16384
(16K) |
中 |
linger.ms |
生产者将在请求传输之间到达的所有记录组合到一个个Batch中。一个Batch被创建之后,最多过linger.ms,不管这个Batch有没有写满,都必须发送出去了。 | long | 0 | 中 |
max.request.size |
请求的最大大小(字节)。 这个参数决定了每次发送给Kafka服务器请求的最大大小,同时也会限制你一条消息的最大大小也不能超过这个参数设置的值。 |
int |
1048576 (1M) |
中 |
compression.type |
producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好 | string | none | 高 |
key.serializer |
key的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
|
class | 高 | |
value.serializer |
值的Serializer类,实现了org.apache.kafka.common.serialization.Serializer接口
|
class | 高 | |
client.id |
当向server发出请求时,这个字符串会发送给server。目的是能够追踪请求源头,以此来允许ip/port许可列表之外的一些应用可以发送信息。 这项应用可以设置任意字符串,因为没有任何功能性的目的,除了记录和跟踪 |
string | "" | 中 |
client.dns.lookup |
控制客户端如何使用DNS查找。 | string | default | 中 |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | 中 |
delivery.timeout.ms |
调用send()返回后报告成功或失败的时间上限。 此配置的值应大于或等于 |
int | 120000 | 中 |
max.block.ms |
控制block的时长,当buffer空间不够或者metadata丢失时产生block. | long | 60000 | 中 |
request.timeout.ms
|
配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。 | int | 30000 | 中 |
partitioner.class |
实现 org.apache.kafka.clients.producer.Partitioner 接口 默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner |
class | 中 |
二、消费者配置
属性 | 描述 | 类型 | 默认值 | 重要性 |
bootstrap.servers |
以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接 | list | "" | 高 |
group.id |
用来唯一标识consumer进程所在组的字符串,如果设置同样的group id,表示这些processes都是属于同一个consumer group | string | null | 高 |
fetch.min.bytes |
服务器应为获取请求返回的最小数据量。如果可用的数据不足,则请求将在响应请求之前等待该数据的累积。 默认设置为1字节意味着,只要有一个字节的数据可用,或者提取请求在等待数据到达时超时,就会响应提取请求。 |
int | 1 | 高 |
fetch.max.bytes |
服务器应为获取请求返回的最大数据量。记录由使用者分批获取,如果获取的第一个非空分区中的第一个记录批大于此值, 则仍将返回该记录批,以确保使用者能够取得进展。 |
int | 52428800 | 中 |
max.partition.fetch.bytes |
服务器将返回的每个分区的最大数据量。记录由消费者分批提取。 如果fetch的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保使用者能够取得进展。 |
int | 1048576 | 高 |
heartbeat.interval.ms |
使用Kafka的组管理工具时,消费者协调器的心跳之间的预期时间。 心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms, 但通常应设置为不高于该值的1/3。它可以调整得更低,以控制正常再平衡的预期时间。 |
int | 3000 | 高 |
session.timeout.ms |
使用Kafka的组管理工具时用于检测客户端故障的超时。 如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了,并且reblance将会产生, 必须在[group.min.session.timeout.ms, group.max.session.timeout.ms]范围内 |
int | 10000 | 高 |
request.timeout.ms |
配置控制客户端等待请求响应的最长时间。 如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。 |
int | 3000 | 高 |
key.deserializer |
key的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 | class | 高 | |
value.deserializer |
值的反序列化类。实现了org.apache.kafka.common.serialization.Deserializer接口 | class | 高 | |
allow.auto.create.topics |
允许在订阅或分配主题时在代理上自动创建主题。 只有当代理允许使用 auto.create.topics.enable 的情况下才生效。 |
boolean | true | 中 |
exclude.internal.topics |
是否应将与订阅模式匹配的内部主题从订阅中排除。始终可以显式订阅内部主题。 | boolean | true | 中 |
enable.auto.commit |
启动自动提交。 如果为真,则用户的偏移量将在后台定期提交。 |
boolean | true | 中 |
auto.commit.interval.ms |
使用者偏移自动提交到Kafka的频率(以毫秒为单位),enable.auto.commit设置为true。 | int | 5000 | 低 |
auto.offset.reset |
当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,由于该数据已被删除):
|
string | latest | 中 |
max.poll.interval.ms |
使用使用者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间量设置了上限。 如果在此超时过期之前未调用poll(),则认为使用者失败,该组将重新平衡,以便将分区重新分配给另一个成员。 对于使用非空group.instance.id组如果达到此超时,则不会立即重新分配分区。 |
int | 300000 | 中 |
max.poll.records |
在对poll()的单个调用中返回的最大记录数。 max.poll.records条数据需要在session.timeout.ms这个时间内处理完 |
int | 500 | 中 |
client.dns.lookup |
控制客户端如何使用DNS查找。 | string | default | 中 |
connections.max.idle.ms |
在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | 中 |
default.api.timeout.ms |
指定客户端API的超时(毫秒)。 | int | 60000 | 中 |
group.instance.id |
最终用户提供的使用者实例的唯一标识符。 | string | null | 中 |
在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略; 循环的partition分配器分配所有可用的partitions以及所有可用consumer 线程。 它会将partition循环的分配到consumer线程上。如果所有consumer实例的订阅都是确定的,则partitions的划分是确定的分布。循环分配策略只有在以下条件满足时才可以: (1)每个topic在每个consumer实例上都有同样数量的数据流。(2)订阅的topic的集合对于consumer group中每个consumer实例来说都是确定的 |
list | class | 中 | |
send.buffer.bytes |
发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。如果值为-1,将使用操作系统默认值。 | int | 131072 | 中 |
receive.buffer.bytes |
读取数据时要使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,将使用操作系统默认值。 | int | 65536 | 中 |
client.id |
请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名来跟踪请求源,而不仅仅是ip/端口。 | string | "" | 低 |
fetch.max.wait.ms |
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。 这个配置就是来配置consumer最多等待response多久。 |
int | 500 | 低 |
引用官网:http://kafka.apache.org/documentation/#consumerconfigs
https://www.cnblogs.com/yx88/p/11013338.html