在 SpringBoot 整合 kafka 很简单。添加依赖 kafka-clients
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>
一、Admin 实战
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.admin.*; import org.junit.jupiter.api.Test; import java.util.*; import java.util.concurrent.ExecutionException; /** * Kafka Admin API */ public class KafkaAdminTest { private static final String TOPIC_NAME = "xdclass-sp-topic-test"; /** * 设置admin客户端 */ public static AdminClient initAdminClient() { Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "139.196.181.97:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } /** * 创建topic */ @Test public void createTopicTest() { AdminClient adminClient = initAdminClient(); //指定分区数量,副本数量 NewTopic newTopic = new NewTopic(TOPIC_NAME, 2, (short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { //future等待创建,成功则不会有任何报错 createTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } /** * 列举topic列表 * * @throws ExecutionException * @throws InterruptedException */ @Test public void listTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); //是否查看内部的topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); //ListTopicsResult listTopicsResult = adminClient.listTopics(); Set<String> topics = listTopicsResult.names().get(); for (String name : topics) { System.err.println(name); } } /** * 删除topic */ @Test public void delTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-sp11-topic", "version1-topic", "my-topic")); deleteTopicsResult.all().get(); } /** * 查看某个topic详情 */ @Test public void detailTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry) -> System.out.println("name :" + entry.getKey() + " , desc: " + entry.getValue())); } /** * 增加topic分区数量 * * @throws ExecutionException * @throws InterruptedException */ @Test public void incrPartitionTopicTest() throws ExecutionException, InterruptedException { Map<String, NewPartitions> infoMap = new HashMap<>(1); AdminClient adminClient = initAdminClient(); NewPartitions newPartitions = NewPartitions.increaseTo(5); infoMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); } }
注意:Kafka 中的分区数只能增加不能减少,减少的话数据不知怎么处理
二、生产者实战
1、Kafka 的 Producer 生产者发送到 Broker 分区策略讲解
生产者发送到 Broker 里面的流程是怎样的呢,一个 Topic 有多个 Partition 分区,每个分区又有多个副本
- 如果指定 Partition ID,则 PR 被发送至指定 Partition (ProducerRecord)
- 如果未指定 Partition ID,但指定了Key,PR 会按照 hash(key) 发送至对应Partition
- 如果未指定 Partition ID 也没指定Key,PR 会按照默认 round-robin 轮训模式发送到每个Partition
- 消费者消费 Partition 分区默认是 range 模式
- 如果同时指定了 Partition ID 和 Key,PR 只会发送到指定的 Partition (Key不起作用,代码逻辑决定)
- 注意:Partition 有多个副本,但只有一个 ReplicationLeader 负责该 Partition 和生产者消费者交互
2、生产者到 Broker 发送流程
- Kafka 的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过 Kafka Producer 发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的 Batch 里面,再一次性发送到 Broker 上去的,这样性能才可能提高
3、生产者常见配置
#kafka地址,即broker地址 bootstrap.servers #当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。 acks #请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性 retries #每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB batch.size # 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 # 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求 # 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送 linger.ms # buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。 # 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器 # 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了 # buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整 buffer.memory # key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使 #消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。 key.serializer value.serializer
4、ProducerRecord(简称PR)
发送给 Kafka Broker 的 key/value 键值对, 封装基础数据信息
-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value
key 默认是null,大多数应用程序会用到key
- 如果 key 为空,kafka 使用默认的Partitioner,使用 RoundRobin 算法将消息均衡地分布在各个 Partition 上
- 如果 key 不为空,kafka 使用自己实现的 hash 方法对 key 进行散列,决定消息该被写到 Topic 的哪个Partition,拥有相同 key 的消息会被写到同一个Partition,实现顺序消息
5、生产者发送消息是异步调用,怎么知道是否有异常?
- 发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
- 回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败
6、Kafka 生产者自定义 Partition 分区规则实战
- 源码解读默认分区器
- org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 自定义分区规则
- 创建类,实现 Partitioner 接口,重写方法
- 配置 partitioner.class 指定类即可
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.producer.*; import org.junit.jupiter.api.Test; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaProducerTest { private static final String TOPIC_NAME = "xdclass-sp-topic-test"; public static Properties getProperties() { Properties props = new Properties(); props.put("bootstrap.servers", "139.196.181.97:9092"); //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.196.181.97:9092"); // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。 props.put("acks", "all"); //props.put(ProducerConfig.ACKS_CONFIG, "all"); // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性 props.put("retries", 0); //props.put(ProducerConfig.RETRIES_CONFIG, 0); // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB props.put("batch.size", 16384); /** * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满 * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求 * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送 */ props.put("linger.ms", 5); /** * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。 * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器 * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了 * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整 * 需要结合实际业务情况压测进行配置 */ props.put("buffer.memory", 33554432); /** * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置, * 即使消息中没有指定key,序列化器必须是一个实 org.apache.kafka.common.serialization.Serializer接口的类, * 将key序列化成字节数组。 */ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } /** * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回 * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合 * <p> * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack * 发送消息后返回的一个 Future 对象,调用get即可 * <p> * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程 * 1)main线程发送消息到RecordAccumulator即返回 * 2)sender线程从RecordAccumulator拉取信息发送到broker * 3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数 */ @Test public void testSend() { Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i)); try { //不关心结果则不用写这些内容 RecordMetadata recordMetadata = future.get(); // topic - 分区编号@offset System.out.println("发送状态:" + recordMetadata.toString()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } producer.close(); } /** * 发送消息携带回调函数 */ @Test public void testSendWithCallback() { Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.err.println("发送状态:" + metadata.toString()); } else { exception.printStackTrace(); } } }); } producer.close(); } /** * 发送消息携带回调函数,指定某个分区 * <p> * 实现顺序消息 */ @Test public void testSendWithCallbackAndPartition() { Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", 4, "xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.err.println("发送状态:" + metadata.toString()); } else { exception.printStackTrace(); } } }); } producer.close(); } /** * 自定义分区策略 */ @Test public void testSendWithPartitionStrategy() { Properties properties = getProperties(); properties.put("partitioner.class", "net.xdclass.xdclasskafka.config.XdclassPartitioner"); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", "xdclass", "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.err.println("发送状态:" + metadata.toString()); } else { exception.printStackTrace(); } } }); } producer.close(); } }
自定义的 XdclassPartitioner 类
package net.xdclass.xdclasskafka.config; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.List; import java.util.Map; public class XdclassPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { throw new IllegalArgumentException("key 参数不能为空"); } if ("xdclass".equals(key)) { return 0; } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }
三、消费者实战
1、消费者根据什么模式从 Broker 获取数据的?
- 消费者采用 pull 拉取方式,从 Broker 的 Partition 获取数据
- pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
- 如果 Broker 没有数据,consumer 可以配置 timeout 时间,阻塞等待一段时间之后再返回
- 如果是 Broker 主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。
2、消费者从哪个分区进行消费?两个策略
- 顶层接口
- org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
round-robin(RoundRobinAssignor非默认策略)轮训
- 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
- c-1:topic-p0/topic-p2/topic-p4/topic-p6
- c-2:topic-p1/topic-p3/topic-p5
弊端
- 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
- 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
- t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
- 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
range(RangeAssignor默认策略)范围
- 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
- c-1:topic-p0/topic-p1/topic-p2/topic-p3
- c-2:topic-p4/topic-p5/topic-p6
弊端
- 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
- 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic 越多则消费的分区也越多,则性能有所下降
3、什么是Rebalance操作?
-
Kafka 怎么均匀地分配某个 Topic 下的所有 Partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。
-
而 Rebalance(重平衡)其实就是重新进行 Partition 的分配,从而使得 Partition 的分配重新达到平衡状态
面试:例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
- Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生 Rebalance 操作
-
- 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
- 分区数量发生变化时(即 Topic 的分区数量发生变化时)
面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?
- 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
- 记录在 ZooKeeper 里面和本地,新版默认将 offset 保存在 Kafka 的内置 Topic 中,名称是 __consumer_offsets
-
- 该 Topic 默认有 50 个Partition,每个 Partition 有3个副本,分区数量由参数 offset.topic.num.partition 配置
- 通过 groupId 的哈希值和该参数取模的方式来确定某个消费者组已消费的 offset 保存到 __consumer_offsets 主题的哪个分区中
- 由消费者组名+主题+分区,确定唯一的 offset 的key,从而获取对应的值
- 三元组:group.id+topic+分区号,而 value 就是 offset 的值
4、SpringBoot 关闭 Kafka 调试日志
#yml配置文件修改 logging: config: classpath:logback.xml #logback.xml内容 <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <root level="info"> <appender-ref ref="STDOUT" /> </root> </configuration>
5、代码实战
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; public class KafkaConsumerTest { public static Properties getProperties() { Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "139.196.181.97:9092"); //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 props.put("group.id", "xdclass-g1"); //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset", "earliest"); //开启自动提交offset //props.put("enable.auto.commit", "true"); props.put("enable.auto.commit", "false"); //自动提交offset延迟时间 //props.put("auto.commit.interval.ms", "1000"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest() { Properties properties = getProperties(); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); //订阅主题 kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME)); while (true) { //领取时间,阻塞超时时间 ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n", record.topic(), record.offset(), record.key(), record.value()); } //同步阻塞提交offset //kafkaConsumer.commitSync(); if (!records.isEmpty()) { //异步提交offset kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { System.err.println("手工提交offset成功:" + offsets.toString()); } else { System.err.println("手工提交offset失败:" + offsets.toString()); } } }); } } } }
6、如果需要从头消费 Partition 消息,怎操作?
- auto.offset.reset 配置策略即可
- 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费,改名和改配置必须同时进行
//默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效
props.put("auto.offset.reset","earliest");
7、自动提交 offset 问题
- 没法控制消息是否正常被消费
- 适合非严谨的场景,比如日志收集发送
8、手工提交offset
- 同步 commitSync 阻塞当前线程 (自动失败重试)
- 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)