zoukankan      html  css  js  c++  java
  • Kafka 核心API/生产者/消费者实战

    在 SpringBoot 整合 kafka 很简单。添加依赖 kafka-clients

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    一、Admin 实战

    package net.xdclass.xdclasskafka;
    
    import org.apache.kafka.clients.admin.*;
    import org.junit.jupiter.api.Test;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    /**
     * Kafka Admin API
     */
    public class KafkaAdminTest {
        private static final String TOPIC_NAME = "xdclass-sp-topic-test";
    
        /**
         * 设置admin客户端
         */
        public static AdminClient initAdminClient() {
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "139.196.181.97:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
    
        /**
         * 创建topic
         */
        @Test
        public void createTopicTest() {
            AdminClient adminClient = initAdminClient();
    
            //指定分区数量,副本数量
            NewTopic newTopic = new NewTopic(TOPIC_NAME, 2, (short) 1);
    
            CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
            try {
                //future等待创建,成功则不会有任何报错
                createTopicsResult.all().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 列举topic列表
         *
         * @throws ExecutionException
         * @throws InterruptedException
         */
        @Test
        public void listTopicTest() throws ExecutionException, InterruptedException {
            AdminClient adminClient = initAdminClient();
    
            //是否查看内部的topic,可以不用
            ListTopicsOptions options = new ListTopicsOptions();
            options.listInternal(true);
    
            ListTopicsResult listTopicsResult = adminClient.listTopics(options);
            //ListTopicsResult listTopicsResult = adminClient.listTopics();
    
            Set<String> topics = listTopicsResult.names().get();
            for (String name : topics) {
                System.err.println(name);
            }
        }
    
        /**
         * 删除topic
         */
        @Test
        public void delTopicTest() throws ExecutionException, InterruptedException {
            AdminClient adminClient = initAdminClient();
    
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-sp11-topic", "version1-topic", "my-topic"));
    
            deleteTopicsResult.all().get();
        }
    
        /**
         * 查看某个topic详情
         */
        @Test
        public void detailTopicTest() throws ExecutionException, InterruptedException {
            AdminClient adminClient = initAdminClient();
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
    
            Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
    
            Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
    
            entries.stream().forEach((entry) -> System.out.println("name :" + entry.getKey() + " , desc: " + entry.getValue()));
        }
    
        /**
         * 增加topic分区数量
         *
         * @throws ExecutionException
         * @throws InterruptedException
         */
        @Test
        public void incrPartitionTopicTest() throws ExecutionException, InterruptedException {
            Map<String, NewPartitions> infoMap = new HashMap<>(1);
    
            AdminClient adminClient = initAdminClient();
            NewPartitions newPartitions = NewPartitions.increaseTo(5);
    
            infoMap.put(TOPIC_NAME, newPartitions);
    
            CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
    
            createPartitionsResult.all().get();
        }
    }

    注意:Kafka 中的分区数只能增加不能减少,减少的话数据不知怎么处理

    二、生产者实战

    1、Kafka 的 Producer 生产者发送到 Broker 分区策略讲解

    生产者发送到 Broker 里面的流程是怎样的呢,一个 Topic 有多个 Partition 分区,每个分区又有多个副本

    • 如果指定 Partition ID,则 PR 被发送至指定 Partition (ProducerRecord)
    • 如果未指定 Partition ID,但指定了Key,PR 会按照 hash(key) 发送至对应Partition
    • 如果未指定 Partition ID 也没指定Key,PR 会按照默认 round-robin 轮训模式发送到每个Partition
      • 消费者消费 Partition 分区默认是 range 模式
    • 如果同时指定了 Partition ID 和 Key,PR 只会发送到指定的 Partition (Key不起作用,代码逻辑决定)
    • 注意:Partition 有多个副本,但只有一个 ReplicationLeader 负责该 Partition 和生产者消费者交互

    2、生产者到 Broker 发送流程

    • Kafka 的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过 Kafka Producer 发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的 Batch 里面,再一次性发送到 Broker 上去的,这样性能才可能提高

    3、生产者常见配置

    #kafka地址,即broker地址
    bootstrap.servers  
    ​
    #当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
    acks
    ​
    #请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
    retries
    ​
    #每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
    batch.size
    ​
    # 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
    # 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
    # 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
    linger.ms
    ​
    # buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
    # 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
    # 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
    # buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
    buffer.memory
    ​
    # key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使
    #消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
    key.serializer
    value.serializer 

    4、ProducerRecord(简称PR)

    发送给 Kafka Broker 的 key/value 键值对, 封装基础数据信息

    -- Topic (名字)
    -- PartitionID (可选)
    -- Key(可选)
    -- Value

    key 默认是null,大多数应用程序会用到key

    • 如果 key 为空,kafka 使用默认的Partitioner,使用 RoundRobin 算法将消息均衡地分布在各个 Partition 上
    • 如果 key 不为空,kafka 使用自己实现的 hash 方法对 key 进行散列,决定消息该被写到 Topic 的哪个Partition,拥有相同 key 的消息会被写到同一个Partition,实现顺序消息

    5、生产者发送消息是异步调用,怎么知道是否有异常?

    • 发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
    • 回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败

    6、Kafka 生产者自定义 Partition 分区规则实战

    • 源码解读默认分区器
      • org.apache.kafka.clients.producer.internals.DefaultPartitioner
    • 自定义分区规则
      • 创建类,实现 Partitioner 接口,重写方法
      • 配置 partitioner.class 指定类即可
    package net.xdclass.xdclasskafka;
    
    import org.apache.kafka.clients.producer.*;
    import org.junit.jupiter.api.Test;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class KafkaProducerTest {
        private static final String TOPIC_NAME = "xdclass-sp-topic-test";
    
        public static Properties getProperties() {
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "139.196.181.97:9092");
            //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "139.196.181.97:9092");
    
            // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
            props.put("acks", "all");
            //props.put(ProducerConfig.ACKS_CONFIG, "all");
    
            // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
            props.put("retries", 0);
            //props.put(ProducerConfig.RETRIES_CONFIG, 0);
    
            // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
            props.put("batch.size", 16384);
    
            /**
             * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
             * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
             * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送减少请求
             * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
             */
            props.put("linger.ms", 5);
    
            /**
             * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
             * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
             * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
             * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
             * 需要结合实际业务情况压测进行配置
             */
            props.put("buffer.memory", 33554432);
    
            /**
             * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,
             * 即使消息中没有指定key,序列化器必须是一个实
             org.apache.kafka.common.serialization.Serializer接口的类,
             * 将key序列化成字节数组。
             */
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            return props;
        }
    
        /**
         * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
         * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
         * <p>
         * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
         * 发送消息后返回的一个 Future 对象,调用get即可
         * <p>
         * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
         * 1)main线程发送消息到RecordAccumulator即返回
         * 2)sender线程从RecordAccumulator拉取信息发送到broker
         * 3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
         */
        @Test
        public void testSend() {
            Properties properties = getProperties();
    
            Producer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 3; i++) {
                Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i));
                try {
                    //不关心结果则不用写这些内容
                    RecordMetadata recordMetadata = future.get();
    
                    // topic - 分区编号@offset
                    System.out.println("发送状态:" + recordMetadata.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            producer.close();
        }
    
        /**
         * 发送消息携带回调函数
         */
        @Test
        public void testSendWithCallback() {
            Properties properties = getProperties();
            Producer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 3; i++) {
                producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.err.println("发送状态:" + metadata.toString());
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            }
            producer.close();
        }
    
        /**
         * 发送消息携带回调函数,指定某个分区
         * <p>
         * 实现顺序消息
         */
        @Test
        public void testSendWithCallbackAndPartition() {
            Properties properties = getProperties();
            Producer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", 4, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.err.println("发送状态:" + metadata.toString());
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            }
            producer.close();
        }
    
        /**
         * 自定义分区策略
         */
        @Test
        public void testSendWithPartitionStrategy() {
            Properties properties = getProperties();
    
            properties.put("partitioner.class", "net.xdclass.xdclasskafka.config.XdclassPartitioner");
    
            Producer<String, String> producer = new KafkaProducer<>(properties);
    
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<>("xdclass-v1-sp-topic-test", "xdclass", "xdclass-value" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null) {
                            System.err.println("发送状态:" + metadata.toString());
                        } else {
                            exception.printStackTrace();
                        }
                    }
                });
            }
            producer.close();
        }
    }

    自定义的 XdclassPartitioner 类

    package net.xdclass.xdclasskafka.config;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.List;
    import java.util.Map;
    
    public class XdclassPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            if (keyBytes == null) {
                throw new IllegalArgumentException("key 参数不能为空");
            }
    
            if ("xdclass".equals(key)) {
                return 0;
            }
    
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    
        @Override
        public void close() {}
    
        @Override
        public void configure(Map<String, ?> configs) {}
    }

    三、消费者实战

    1、消费者根据什么模式从 Broker 获取数据的?

    • 消费者采用 pull 拉取方式,从 Broker 的 Partition 获取数据
    • pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
    • 如果 Broker 没有数据,consumer 可以配置 timeout 时间,阻塞等待一段时间之后再返回
    • 如果是 Broker 主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。

    2、消费者从哪个分区进行消费?两个策略

    • 顶层接口
      • org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

    round-robin(RoundRobinAssignor非默认策略)轮训

    • 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
    • c-1:topic-p0/topic-p2/topic-p4/topic-p6
    • c-2:topic-p1/topic-p3/topic-p5

    弊端

    • 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
    • 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
    • t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
    • 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2

    range(RangeAssignor默认策略)范围

    • 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
    • c-1:topic-p0/topic-p1/topic-p2/topic-p3
    • c-2:topic-p4/topic-p5/topic-p6

    弊端

    • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
    • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic 越多则消费的分区也越多,则性能有所下降

    3、什么是Rebalance操作?

    • Kafka 怎么均匀地分配某个 Topic 下的所有 Partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。

    • 而 Rebalance(重平衡)其实就是重新进行 Partition 的分配,从而使得 Partition 的分配重新达到平衡状态

    面试:例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?

    • Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生 Rebalance 操作
      • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
      • 分区数量发生变化时(即 Topic 的分区数量发生变化时)

    面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?

    • 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
    • 记录在 ZooKeeper 里面和本地,新版默认将 offset 保存在 Kafka 的内置 Topic 中,名称是 __consumer_offsets
      • 该 Topic 默认有 50 个Partition,每个 Partition 有3个副本,分区数量由参数 offset.topic.num.partition 配置
      • 通过 groupId 的哈希值和该参数取模的方式来确定某个消费者组已消费的 offset 保存到 __consumer_offsets 主题的哪个分区中
      • 由消费者组名+主题+分区,确定唯一的 offset 的key,从而获取对应的值
      • 三元组:group.id+topic+分区号,而 value 就是 offset 的值

    4、SpringBoot 关闭 Kafka 调试日志

    #yml配置文件修改
    logging:
      config: classpath:logback.xml
      
      
    #logback.xml内容
    <configuration>
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
                <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
            </encoder>
        </appender><root level="info">
            <appender-ref ref="STDOUT" />
        </root>
    </configuration>

    5、代码实战

    package net.xdclass.xdclasskafka;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.TopicPartition;
    import org.junit.jupiter.api.Test;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Map;
    import java.util.Properties;
    
    public class KafkaConsumerTest {
    
        public static Properties getProperties() {
            Properties props = new Properties();
    
            //broker地址
            props.put("bootstrap.servers", "139.196.181.97:9092");
    
            //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
            props.put("group.id", "xdclass-g1");
    
            //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效
            props.put("auto.offset.reset", "earliest");
    
            //开启自动提交offset
            //props.put("enable.auto.commit", "true");
            props.put("enable.auto.commit", "false");
    
            //自动提交offset延迟时间
            //props.put("auto.commit.interval.ms", "1000");
    
            //反序列化
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            return props;
        }
    
        @Test
        public void simpleConsumerTest() {
            Properties properties = getProperties();
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            //订阅主题
            kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));
    
            while (true) {
                //领取时间,阻塞超时时间
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
    
                for (ConsumerRecord record : records) {
                    System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n", record.topic(), record.offset(), record.key(), record.value());
                }
    
                //同步阻塞提交offset
                //kafkaConsumer.commitSync();
    
                if (!records.isEmpty()) {
                    //异步提交offset
                    kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                            if (exception == null) {
                                System.err.println("手工提交offset成功:" + offsets.toString());
                            } else {
                                System.err.println("手工提交offset失败:" + offsets.toString());
                            }
                        }
                    });
                }
            }
        }
    }

    6、如果需要从头消费 Partition 消息,怎操作?

    • auto.offset.reset 配置策略即可
    • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费,改名和改配置必须同时进行
    //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 
    props.put("auto.offset.reset","earliest");

    7、自动提交 offset 问题

    • 没法控制消息是否正常被消费
    • 适合非严谨的场景,比如日志收集发送

    8、手工提交offset

    • 同步 commitSync 阻塞当前线程 (自动失败重试)
    • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)
  • 相关阅读:
    Gym
    HDU-2680 Choose the best route 单向边+反向dijkstra
    Hdu1010Tempter of the Bone 深搜+剪枝
    CodeForces
    CodeForces
    Gym-101375C MaratonIME eats japanese food 初始化struct技巧
    Gym
    java was started but returned exit code =-805306369的处理方法
    启动myeclipse弹窗Please allow Subclipse team to receive anonymous usage statistics for this Eclipse intance
    eclipse包分层
  • 原文地址:https://www.cnblogs.com/jwen1994/p/14805743.html
Copyright © 2011-2022 走看看