zoukankan      html  css  js  c++  java
  • Spring 对Apache Kafka的支持与集成

    1. 引言

    Apache Kafka 是一个分布式的、容错的流处理系统。在本文中,我们将介绍Spring对Apache Kafka的支持,以及原生Kafka Java客户端Api 所提供的抽象级别。

    Spring Kafka 通过 @KafkaListener 注解,带来了一个简单而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和消息驱动的 POJO 。

    2. 安装和设置

    要下载和安装Kafka,请参考官方指南。然后还需要在 pom.xml 文件中添加 spring-kafka:

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.3.7.RELEASE</version>
    </dependency>
    

    新建一个 Spring Boot 示例应用程序,以默认配置启动。

    3. 配置 Topics

    以前我们使用命令行工具在 Kafka 中创建 topic,例如:

    $ bin/kafka-topics.sh --create 
      --zookeeper localhost:2181 
      --replication-factor 1 --partitions 1 
      --topic mytopic
    

    但是随着 AdminClient 在Kafka中的引入,我们现在可以通过编程来创建 Topic

    如下代码,添加 KafkAdmin bean 到 Spring中,它将自动为 NewTopic 类的所有 bean 添加 topic

    @Configuration
    public class KafkaTopicConfig {
        
        @Value(value = "${kafka.bootstrapAddress}")
        private String bootstrapAddress;
     
        @Bean
        public KafkaAdmin kafkaAdmin() {
            Map<String, Object> configs = new HashMap<>();
            configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            return new KafkaAdmin(configs);
        }
        
        @Bean
        public NewTopic topic1() {
             return new NewTopic("developlee", 1, (short) 1);
        }
    }
    

    4. 消息生成

    要创建消息,首先需要配置 ProducerFactory ,并设置创建 Kafka Producer 实例的策略,然后使用 KafkaTemplateKafkaTemplate 包装了 Producer 实例,并提供向 Kafka Topic 发送消息的简便方法。

    在整个应用程序上下文中使用单个实例将提供更高的性能。因此推荐使用一个 Producer 实例。该实例是线程安全的,所以 KakfaTemplate 实例也是线程安全的,

    4.1. Producer 配置

    @Configuration
    public class KafkaProducerConfig {
     
        @Bean
        public ProducerFactory<String, String> producerFactory() {
            Map<String, Object> configProps = new HashMap<>();
            configProps.put(
              ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
              bootstrapAddress);
            configProps.put(
              ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
              StringSerializer.class);
            configProps.put(
              ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
              StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }
     
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    }
    

    4.2. 消息发布

    我们使用 KafkaTemplate 来发布消息:

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
     
    public void sendMessage(String msg) {
        kafkaTemplate.send(topicName, msg);
    }
    

    send API 返回 ListenableFuture 对象。如果我们想阻塞发送线程并获得关于发送消息的结果,我们可以调用ListenableFuture 对象的 get API。线程将会等待结果,但它会降低生产者的速度。

    Kafka是一个快速流处理平台。因此,最好异步处理结果,这样后续消息就无需等待前一条消息的结果。我们可以通过回调来实现:

    public void sendMessage(String message) {
                
        ListenableFuture<SendResult<String, String>> future = 
          kafkaTemplate.send(topicName, message);
    	
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
     
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Sent message=[" + message + 
                  "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("Unable to send message=[" 
                  + message + "] due to : " + ex.getMessage());
            }
        });
    }
    

    5. 消息消费

    5.1. 消费者配置

    对于消费消息,我们需要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory

    一旦这些bean在Spring Bean工厂中可用,就可以使用 @KafkaListener 注解配置基于POJO的消费者。

    配置类上需要添加 @EnableKafka 注解,以便能够检测Spring管理的bean上的 @KafkaListener 注解:

    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
     
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
              bootstrapAddress);
            props.put(
              ConsumerConfig.GROUP_ID_CONFIG, 
              groupId);
            props.put(
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
              StringDeserializer.class);
            props.put(
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
              StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }
     
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> 
          kafkaListenerContainerFactory() {
       
            ConcurrentKafkaListenerContainerFactory<String, String> factory =
              new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    }
    

    5.2. 消息消费

    @KafkaListener(topics = "topicName", groupId = "foo")
    public void listenGroupFoo(String message) {
        System.out.println("Received Message in group foo: " + message);
    }
    

    可以为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者可以监听来自不同 topic 的消息:

    @KafkaListener(topics = "topic1, topic2", groupId = "foo")
    

    Spring 还支持使用 listener 中的 @Header 注解检索一个或多个消息标题:

    @KafkaListener(topics = "topicName")
    public void listenWithHeaders(
      @Payload String message, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
          System.out.println(
            "Received Message: " + message"
            + "from partition: " + partition);
    }
    

    5.3. 消费来自特定分区的消息

    注意到,我们只使用一个分区创建了 topic “developlee”。但是,对于具有多个分区的主题,@KafkaListener 可以显式订阅具有初始偏移量 topic 的特定分区:

    @KafkaListener(
      topicPartitions = @TopicPartition(topic = "topicName",
      partitionOffsets = {
        @PartitionOffset(partition = "0", initialOffset = "0"), 
        @PartitionOffset(partition = "3", initialOffset = "0")}),
      containerFactory = "partitionsKafkaListenerContainerFactory")
    public void listenToPartition(
      @Payload String message, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
          System.out.println(
            "Received Message: " + message"
            + "from partition: " + partition);
    }
    

    由于 initialOffset 已被发送到该 listener 中的分区0,因此每次初始化该 listener 时,将重新使用以前从分区0和分区3消耗的所有消息。如果不需要设置偏移量,我们可以使用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:

    @KafkaListener(topicPartitions 
      = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
    

    5.4. 为Listener添加消息过滤器

    通过添加自定义过滤器,可以将 listener 配置为使用特定类型的消息。这可以通过将 RecordFilterStrategy 设置为 KafkaListenerContainerFactory 来完成:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
      filterKafkaListenerContainerFactory() {
     
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordFilterStrategy(
          record -> record.value().contains("World"));
        return factory;
    }
    

    然后可以将 listener 配置为使用此容器工厂:

    @KafkaListener(
      topics = "topicName", 
      containerFactory = "filterKafkaListenerContainerFactory")
    public void listenWithFilter(String message) {
        System.out.println("Received Message in filtered listener: " + message);
    }
    

    在这个 listener 中,所有与过滤器匹配的消息都将被丢弃

    6. 自定义消息转换器

    到目前为止,我们只讨论了字符串作为消息发送和接收的对象。但是,我们也可以发送和接收定制的Java对象。这需要在 ProducerFactory 中配置适当的序列化器,并在 ConsumerFactory 中配置反序列化器。

    让我们看一个简单的bean,并将以消息的形式发送它:

    public class Greeting {
     
        private String msg;
        private String name;
     
        // standard getters, setters and constructor
    }
    

    6.1. 生产自定义消息

    在本例中,我们将使用 JsonSerializer。我们看看 ProducerFactoryKafkaTemplate 的代码:

    @Bean
    public ProducerFactory<String, Greeting> greetingProducerFactory() {
        // ...
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
     
    @Bean
    public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
        return new KafkaTemplate<>(greetingProducerFactory());
    }
    

    新的 KafkaTemplate 可用于发送 Greeting 消息:

    kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
    

    6.2. 消费自定义消息

    同样,我们修改 ConsumerFactoryKafkaListenerContainerFactory 来正确反序列化 Greeting 消息:

    @Bean
    public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
        // ...
        return new DefaultKafkaConsumerFactory<>(
          props,
          new StringDeserializer(), 
          new JsonDeserializer<>(Greeting.class));
    }
     
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
      greetingKafkaListenerContainerFactory() {
     
        ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(greetingConsumerFactory());
        return factory;
    }
    

    spring-kafka JSON序列化器和反序列化器使用 Jackson 库,该库是 spring-kafka 项目的可选maven依赖项。我们也把它加到 pom.xml 文件:

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.7</version>
    </dependency>
    

    建议不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka 的版本。
    最后,我们需要编写一个 listener 来 消费 Greeting 消息:

    @KafkaListener(
      topics = "topicName", 
      containerFactory = "greetingKafkaListenerContainerFactory")
    public void greetingListener(Greeting greeting) {
        // process greeting message
    }
    

    7. 结语

    在本文中,我们介绍了Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接收消息的类。
    本文的完整源代码可以在GitHub上找到. 在执行代码之前,请确保服务器正在运行 Kafka。
    如果你觉得文章还不错,记得关注公众号: 锅外的大佬
    刘一手的博客

  • 相关阅读:
    ASP.NET 自制时间控件
    ORACLE 函数汇总之单记录函数
    Servers IIS 重启命令
    ASP.NET 两个Dropdownlist控件联动
    ASP.NET datagridview控件表头定义
    python Image 安装
    ssh 不需要密码的链接
    [Redis] redis 相关的博客
    [emacs] python代码折叠
    linux python 链接 oracle
  • 原文地址:https://www.cnblogs.com/liululee/p/14042652.html
Copyright © 2011-2022 走看看