zoukankan      html  css  js  c++  java
  • 【Kafka】Kafka API 的使用

    一、引入kafka-clients依赖

      要在普通Maven项目中是用Kafka API需要引入Kafka的客户端依赖

    1 <dependency>
    2     <groupId>org.apache.kafka</groupId>
    3     <artifactId>kafka-clients</artifactId>
    4     <version>2.6.0</version>
    5 </dependency>

      完整依赖如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>org.example</groupId>
     8     <artifactId>test-kafka-client1</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <properties>
    12         <maven.compiler.source>8</maven.compiler.source>
    13         <maven.compiler.target>8</maven.compiler.target>
    14     </properties>
    15 
    16     <dependencies>
    17         <dependency>
    18             <groupId>org.apache.kafka</groupId>
    19             <artifactId>kafka-clients</artifactId>
    20             <version>2.6.0</version>
    21         </dependency>
    22 
    23         <dependency>
    24             <groupId>com.fasterxml.jackson.core</groupId>
    25             <artifactId>jackson-databind</artifactId>
    26             <version>2.10.2</version>
    27         </dependency>
    28 
    29         <dependency>
    30             <groupId>org.slf4j</groupId>
    31             <artifactId>slf4j-simple</artifactId>
    32             <version>1.7.21</version>
    33         </dependency>
    34     </dependencies>
    35 
    36 </project>
    View Code

    二、Producer API

    1、消息发送流程

      Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。

    2、生产者编写代码需要用到的类

      KafkaProducer:需要创建一个生产者对象,用来发送数据

      ProducerConfig:获取所需的一系列配置参数

      ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

    3、不带回调函数的Producer API

     1 package com.test.kafka.producer;
     2 
     3 import org.apache.kafka.clients.producer.KafkaProducer;
     4 import org.apache.kafka.clients.producer.ProducerConfig;
     5 import org.apache.kafka.clients.producer.ProducerRecord;
     6 
     7 import java.util.Properties;
     8 
     9 public class FirstProducer {
    10     public static void main(String[] args) throws InterruptedException {
    11 
    12         // 创建生产者配置信息
    13         Properties props = new Properties();
    14 
    15         // kafka 集群,broker-list
    16         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    17         // ACK应答级别
    18         props.put(ProducerConfig.ACKS_CONFIG, "all");
    19         // 重试次数
    20         props.put(ProducerConfig.RETRIES_CONFIG, 1);
    21         // 批次大小 16k
    22         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    23         // 等待时间 1ms
    24         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    25         // RecordAccumulator 缓冲区大小  32m
    26         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    27         // 序列化器
    28         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    29         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    30 
    31         // 创建生产者对象
    32         KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    33 
    34         // 发送数据
    35         for (int i = 0; i < 1000; i++) {
    36             producer.send(new ProducerRecord<>("first", "~~value~~~~" + i));
    37             Thread.sleep(500);
    38         }
    39 
    40         // 关闭连接
    41         producer.close();
    42     }
    43 }

    4、带回调函数的Producer API

      回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

      注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

     1 package com.test.kafka.producer;
     2 
     3 import com.test.kafka.partitioner.MyPartitioner;
     4 import org.apache.kafka.clients.producer.*;
     5 
     6 import java.util.ArrayList;
     7 import java.util.List;
     8 import java.util.Properties;
     9 import java.util.concurrent.ExecutionException;
    10 import java.util.concurrent.Future;
    11 
    12 public class CallBackProducer {
    13     public static void main(String[] args) throws ExecutionException, InterruptedException {
    14 
    15         // 创建生产者配置信息
    16         Properties props = new Properties();
    17 
    18         // kafka 集群,broker-list
    19         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    20         // ACK应答级别
    21         props.put(ProducerConfig.ACKS_CONFIG, "all");
    22         // 重试次数
    23         props.put(ProducerConfig.RETRIES_CONFIG, 1);
    24         // 批次大小 16k
    25         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    26         // 等待时间 1ms
    27         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    28         // RecordAccumulator 缓冲区大小  32m
    29         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    30         // 序列化器
    31         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    32         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    33 
    34 //        // 添加分区器
    35 //        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.test.kafka.partitioner.MyPartitioner");
    36 //
    37 //        // 拦截器链
    38 //        List<String> interceptors = new ArrayList<>();
    39 //        interceptors.add("com.test.kafka.interceptor.TimeInterceptor");
    40 //        interceptors.add("com.test.kafka.interceptor.CounterInterceptor");
    41 //        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    42 
    43         // 创建生产者对象
    44         KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    45 
    46         // 发送数据
    47         for (int i = 0; i < 10; i++) {
    48 //            producer.send(new ProducerRecord<>("second", "value~~~~" + i), (metadata, exception) -> {
    49             Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("second", String.valueOf(i), "value~~~~" + i), (metadata, exception) -> {
    50                 // 正常返回 exception 为空
    51                 if (exception == null) {
    52                     System.out.println("partition:" + metadata.partition() + "	offset:" + metadata.offset());
    53                 } else {
    54                     exception.printStackTrace();
    55                 }
    56             });
    57 
    58             // 同步发送策略
    59             RecordMetadata metadata = future.get();
    60             System.out.println(metadata);
    61         }
    62         // 关闭连接
    63         producer.close();
    64 
    65     }
    66 }

    三、Consumer API

      Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故 不用担心数据丢失问题。由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故 障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢 复后继续消费。所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

    1、消费者编写代码需要用到的类  

       KafkaConsumer:需要创建一个消费者对象,用来消费数据 

      ConsumerConfig:获取所需的一系列配置参数

      ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象

      为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。  

      自动提交 offset 的相关参数:

    • enable.auto.commit:是否开启自动提交 offset 功能 

    • auto.commit.interval.ms:自动提交 offset 的时间间隔

    2、手动提交 offset 的代码

     1 package com.test.kafka.consumer;
     2 
     3 import org.apache.kafka.clients.consumer.ConsumerConfig;
     4 import org.apache.kafka.clients.consumer.ConsumerRecord;
     5 import org.apache.kafka.clients.consumer.ConsumerRecords;
     6 import org.apache.kafka.clients.consumer.KafkaConsumer;
     7 
     8 
     9 import java.util.Arrays;
    10 import java.util.Properties;
    11 
    12 public class FirstConsumer {
    13 
    14     public static void main(String[] args) {
    15         // 创建配置信息
    16         Properties props = new Properties();
    17 
    18         // kafka 集群,broker-list
    19         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    20         // 开启自动提交
    21 //        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    22         // 开启手动提交
    23         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    24 
    25         // 自动提交延迟
    26         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    27         // 反序列化
    28         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    29         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    30         // 重置offset,满足条件下,可以重置重置offset,从最早的消息开始消费
    31         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    32         // 消费者组
    33         props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-my-group");
    34 
    35         // 创建消费者
    36         KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    37 
    38         // 订阅主题
    39         consumer.subscribe(Arrays.asList("first", "second"));
    40 
    41         while (true) {
    42             // 拉取消息
    43             ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
    44             System.out.println("数量:" + consumerRecords.count());
    45             // 遍历
    46             for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
    47                 System.out.println(consumerRecord.key() + "----" + consumerRecord.value());
    48             }
    49 
    50             // 手动同步提交,当前线程会阻塞直到 offset 提交成功
    51             consumer.commitSync();
    52         }
    53 
    54 //        // 关闭
    55 //        consumer.close();
    56     }
    57 }

    三、自定义分区器

    1、分区器代码

     1 package com.test.kafka.partitioner;
     2 
     3 import org.apache.kafka.clients.producer.Partitioner;
     4 import org.apache.kafka.common.Cluster;
     5 
     6 import java.util.Map;
     7 
     8 /**
     9  * 自定义分区器
    10  */
    11 public class MyPartitioner implements Partitioner {
    12     @Override
    13     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    14         cluster.partitionsForTopic(topic).size();
    15         return 1;
    16     }
    17 
    18     @Override
    19     public void close() {
    20 
    21     }
    22 
    23     @Override
    24     public void configure(Map<String, ?> configs) {
    25 
    26     }
    27 
    28     public static void main(String[] args) {
    29         System.out.println(Math.abs("test-my-group".hashCode()) % 50);
    30     }
    31 }

    2、分区器注入Producer

    1 // 添加分区器
    2 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.test.kafka.partitioner.MyPartitioner");

    四、自定义 Interceptor

      Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定 制化控制逻辑。

      对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会 对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:  

      (1) configure(configs):获取配置信息和初始化数据时调用。

      (2) onSend(ProducerRecord): 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好 保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。

      (3) onAcknowledgement(RecordMetadata, Exception): 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程 中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息 发送效率。  

      (4) close:关闭 interceptor,主要用于执行一些资源清理工作如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅 是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中 要特别留意

    1、拦截器案例

      (1) 增加时间戳拦截器 

     1 public class TimeInterceptor implements ProducerInterceptor<String, String> {
     2     @Override
     3     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
     4         // 创建一个新的 record,把时间戳写入消息体的最前部
     5         return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
     6                 System.currentTimeMillis() + "," + record.value().toString());
     7     }
     8 
     9     @Override
    10     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    11 
    12     }
    13 
    14     @Override
    15     public void close() {
    16 
    17     }
    18 
    19     @Override
    20     public void configure(Map<String, ?> configs) {
    21 
    22     }
    23 }

      (2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器

     1 public class CounterInterceptor implements ProducerInterceptor {
     2 
     3     private int errorCounter = 0;
     4     private int successCounter = 0;
     5 
     6 
     7     @Override
     8     public ProducerRecord onSend(ProducerRecord record) {
     9         return record;
    10     }
    11 
    12     @Override
    13     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    14 
    15         // 统计成功和失败的次数
    16         if (exception == null) {
    17             successCounter++;
    18         } else {
    19             errorCounter++;
    20         }
    21     }
    22 
    23     @Override
    24     public void close() {
    25         // 保存结果
    26         System.out.println("Successful sent: " + successCounter);
    27         System.out.println("Failed sent: " + errorCounter);
    28     }
    29 
    30     @Override
    31     public void configure(Map<String, ?> configs) {
    32 
    33     }
    34 }

    2、拦截器注入Producer

    1 // 拦截器链
    2 List<String> interceptors = new ArrayList<>();
    3 interceptors.add("com.test.kafka.interceptor.TimeInterceptor");
    4 interceptors.add("com.test.kafka.interceptor.CounterInterceptor");
    5 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  • 相关阅读:
    UILabel 设置字体间的距离 和 行与行间的距离
    IB_DESIGNABLE 和 IBInspectable 的使用
    干货博客
    GitHub克隆速度太慢解决方案
    实时(RTC)时钟,系统时钟和CPU时钟
    折腾了好久的vscode配置c/c++语言环境(Windows环境下)
    c语言中的malloc函数
    记录一下关于在工具类中更新UI使用RunOnUiThread犯的极其愚蠢的错误
    记录关于Android多线程的一个坑
    Android中限制输入框最大输入长度
  • 原文地址:https://www.cnblogs.com/h--d/p/14873173.html
Copyright © 2011-2022 走看看