之前使用 Kafka 的客户端消费者比较多一点,而且也是无脑订阅使用也没有深入了解过具体的参数。总的来说使用不够细节。
这次公司项目活动期间暴露非常多的问题,于是有了这篇文章。
首先我们来拆解一下 Kafka KafkaProducer 类给我们暴露的参数,我会依次介绍一下这些参数的功能以及效果,其中也包括比较重要的自定义 paritiition 的参数。
1. bootstrap_servers
bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' strings) that the producer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
bootstrap_servers 接受单个字符串或者 list 参数,默认值是 'localhost',该值让你指定 kafka 的服务 server 格式是 host:port 。例如我们使用的集群,那么我们需要指定我们的生产者找到我们的集群地址我们会设置类似这样的 bootstrap_servers 地址
kafka_conf = { 'bootstrap_servers': ['10.171.97.1:9092', '10.163.13.219:9092', '10.170.249.122:9092'] } self.kafka_producer = kafka.KafkaProducer(**kafka_conf)
默认是 localhost。
2. client_id
client_id (str): a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Default: 'kafka-python-producer-#' (appended with a unique number per instance)
设置了这个客户端 id 可以让你在与 server 通讯的时候标识这个生产者端。
默认是 None 不设置。
3. key_serializer | value_serializer
key_serializer (callable): used to convert user-supplied keys to bytes If not None, called as f(key), should return bytes. Default: None. value_serializer (callable): used to convert user-supplied message values to bytes. If not None, called as f(value), should return bytes. Default: None.
key 和 value 的内容的序列化。可以指定自己的回调解析。
默认参数是 None
4. acks
acks (0, 1, 'all'): The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: 0: Producer will not wait for any acknowledgment from the server. The message will immediately be added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. 1: Wait for leader to write the record to its local log only. Broker will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. all: Wait for the full set of in-sync replicas to write the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. If unset, defaults to acks=1.
acks 是保障可靠性中非常重要的参数。
在设置为 0 的情况下, kafka 会不管任何的事情,只要收到了消息就往 buffer 或者直接发送。是效率最高,但是完全不能保证消息是否发送成功,也不会确认 broker 的信息,更不会去重试属于典型的 fire and forget。
该参数的默认值是 1,在参数被设置成 1 的情况下,会等待消费被 broker 端 leader 写入到日志中完成并且 broker leader 会返回 response ,但是不会等待其他 isr 写好副本的返回。在这种情况下如果 leader 挂了进行重新选举,可能会丢失消息。该参数比较好的兼容了吞吐量和可用性,只要 leader 不挂消息不会丢失,而且消息会慢慢被 拷贝到其他的 isr 上。
在设置成 all 的情况下,会等待所有的 isr 都同步之后,才会返回,在消息完全不允许丢失的情况下启用该参数。他能保证只要还有一个可用的 isr 存活,消息都不会丢失。
默认是 设置成 1.
5. retries
retries (int): Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max_in_flight_requests_per_connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Default: 0.
当我们设置 ack 大于 0 的情况下,该参数生效。如果我们发送失败设置了该参数会进行重试。默认情况下重试被关闭,如果需要保证数据不丢失活着高可用,可以将参数改为一个较大的值。
试到成功为止,但是也可能因为这个引入一些 block 的问题,需要根据自己的环境进行评估和把握。
默认是不开启重试。
6. compression_type
compression_type (str): The compression type for all data generated by the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None.
指定一个压缩类型。这里注意我们是生产者,如果生产者生产了压缩的消息到达了 broker ,消费者可能同样需要指定相同的解压算法对数据进行解压。
默认是不压缩。
7. batch_size
batch_size (int): Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). Default: 16384
批处理大小 默认值 16kb,用于减少多次发送给 broker 给 broker 带来压力,发往 broker 同一个 partitions 的消息到达批处理大小后发送。我还没有调整过该参数,不太清楚具体影响。应该是在数据量和压力都特别大的情况下,有助于帮助网络减少繁忙状态。
默认大小是 16kb。
8. linger_ms
linger_ms (int): The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay; that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch_size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger_ms=5 would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. Default: 0.
这个参数通常情况下只有过载的情况下会触发,过载指生产者发送消息到 broker 的速度已经跟不上消息到达生产者的速度了。这个时候我们会在生产端 hold 一段时间 linger_ms 然后一并将内容发送到 broker ,以求减少到达 broker 的 requests 。实现这个功能是采取增加一段小的延迟来实现的。这有点像 tcp 上之前常用的 nagle 算法。
同样这个参数我没有尝试过 - -不太清楚效果。如果遭遇大流量拥塞可以尝试开启。
默认是不开启。
9. partitioner
partitioner (callable): Callable used to determine which partition each message is assigned to. Called (after key serialization): partitioner(key_bytes, all_partitions, available_partitions). The default partitioner implementation hashes each non-None key using the same murmur2 algorithm as the java client so that messages with the same key are assigned to the same partition. When a key is None, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible).
partitioner 是消息生产到指定 partitions 的计算函数,Python 端默认使用 DefaultPartitioner() 可以直接查看分组策略和使用参数 我这里贴一个
class DefaultPartitioner(object): """Default partitioner. Hashes key to partition using murmur2 hashing (from java client) If key is None, selects partition randomly from available, or from all partitions if none are currently available """ @classmethod def __call__(cls, key, all_partitions, available): """ Get the partition corresponding to key :param key: partitioning key :param all_partitions: list of all partitions sorted by partition ID :param available: list of available partitions in no particular order :return: one of the values from all_partitions or available """ if key is None: if available: return random.choice(available) return random.choice(all_partitions) idx = murmur2(key) idx &= 0x7fffffff idx %= len(all_partitions) return all_partitions[idx]
可以看到如果不指定 key 进行分组的话 使用了python 的 random choice 方法进行选择,如果使用 key 分组默认使用 murmu2 算法。
如果不指定 key 出现数据有倾斜的问题,可以尝试提供新的的 partitions 算法。
10. buffer_memory
buffer_memory (int): The total bytes of memory the producer should use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block up to max_block_ms, raising an exception on timeout. In the current implementation, this setting is an approximation. Default: 33554432 (32MB)
生产者端的一个发送 buffer ,最大提供了 32mb。如果 32mb 被填充满并且来不及全部发送给 broker ,将会触发 max_block_ms 并且 raise 出一个 timeout exception 。
默认 32mb ,这个参数应该也需要在数据量非常大的情况下才会触发。但是我感觉该参数如果不是临时阻塞用上了 32mb 的缓存,应该都会触发超时。
11. max_block_ms
max_block_ms (int): Number of milliseconds to block during :meth:`~kafka.KafkaProducer.send` and :meth:`~kafka.KafkaProducer.partitions_for`. These methods can be blocked either because the buffer is full or metadata unavailable. Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. Default: 60000.
设置在调用 KafKaProducer.send 和 KafKaProducer.paritions_for 还有在 buffer 已经满的情况下的超时时间。
例如我的 send 方法阻塞了 默认情况下 60s 会失效。但是感觉这个失效时间有点略长。如果 buffer 阻塞等 60s 可能后面的消息也已经排起长队了。
12.max_request_size
max_request_size (int): The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. Default: 1048576.
该参数会限制单批发送的最大请求大小,来避免请求发送过大。在服务器端应该也有个类似参数来控制消息避免发送过大参数。
默认最大可以发送 1m 消息。
13.retry_backoff_ms
retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100.
重试阻塞时间默认为 100ms
14.metadata_max_age_ms
metadata_max_age_ms (int): The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
元数据最大刷新时间
默认是 5分钟刷通过去 broker 刷新一次元数据。
15.max_in_flight_requests_per_connection
max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled). Default: 5.
看了几篇文章感觉没有把这个参数说清楚,这个参数默认是 5 。说的是单个 connection 同时允许 5 个消息发送之后确认消息,如果设置成 1 发送一条将会对消息进行确认来保证顺序。
怎么说呢,因为 producer 是 pipline 的是顺序发送的,只有重试的时候会引入顺序问题。比如我发了 1 2 3 4 5 然后开始确认了,确认的时候 2 没了 我需要确认 2 然后进行重发。顺序就乱了,因为 5 都接收了 我还要发送一次 2 。试想一下如果我们具有 idempotent ,那么我们 2 错了我们可以让 3 4 5都进行重传,这样又保证了顺序。
如果我们没有幂等,我们就需要将 ack 设置为 all 并且将 max_in_flight_requests_per_connection 调整为 1 ,然后启用一个 partitions 来完全保障数据的顺序传输。
还有不少参数包括安全协议传输的参数,我没有列举到这里。那些参数大部分时候都用不上,需要的时候再去看也行。
当我们在发消息的时候通常不需要关注到 这么多参数,只是针对特定情况下我们需要调整一些参数来保障我们想要实现的语意。比较常见的一个情况是我们可能希望我们的消息是不丢失的,那我们应该如何配置呢?
其实感觉 kafka 很多情况下都依赖消费端进行幂等,如果消费端幂等的话整个流程会非常健壮和快速也就是实现 At least Once 语意,而不是去保证 exactly once。
1. 使用 producer.send 设置 ack 为 all 。
2. 设置 retry 为较大值,重试避免消息丢失。
3. 设置 unclean.leader.election.enable = false 阻止落后太多的非 irs 竞选 leader。
4. 设置 topic 级别的 replication.factor >= 3 多备份冗余。
5. 设置 min.insync.replicas > 1 控制提交数尽量多确认提交。
6. 消费端放弃使用 autocommit 而使用 手动 commit 老保障消息的准确,如果我们使用 autocommit 也要保障消费端 idempotent。
Reference:
https://en.wikipedia.org/wiki/Nagle%27s_algorithm
https://xinklabi.iteye.com/blog/2195092 MurmurHash算法(高运算性能,低碰撞率,hadoop、memcached等使用)
https://www.iteblog.com/archives/2560.html Kafka 是如何保证数据可靠性和一致性
https://stackoverflow.com/questions/49802686/understanding-the-max-inflight-property-of-kafka-producer Understanding the max.inflight property of kafka producer
http://matt33.com/2018/10/24/kafka-idempotent/ Kafka 事务性之幂等性实现