zoukankan      html  css  js  c++  java
  • SpringBoot与Kafka整合实现简单分布式消息队列

    SpringBoot与Kafka整合实现简单分布式消息队列

    1、此处只是单纯的梳理一下SpringBoot整合kafka,其他像Zookeeper、kafka等环境的安装就不在详

    细说明,kafka安装可参考https://www.cnblogs.com/jhtian/p/13708679.html Zookeeper安装里面也有

    相应的链接。

    环境说明:           Kafka:192.168.232.3:9020

            Zookeeper:192.168.232.3:2181

                 192.168.232.4:2181(master)

                 192.168.232.5:2181

    2、工程目录(生产者-producer)

    既然是SpringBoot整合kafka,那肯定是要搭建一个SpringBoot工程目录,SpringBoot的搭建就不在

    此处说明,搭建好之后的生产端工程如下

    引入相应的依赖到pom.xml文件中去

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     4     <modelVersion>4.0.0</modelVersion>
     5     <parent>
     6         <groupId>org.springframework.boot</groupId>
     7         <artifactId>spring-boot-starter-parent</artifactId>
     8         <version>2.1.5.RELEASE</version>
     9         <relativePath/> <!-- lookup parent from repository -->
    10     </parent>
    11     <groupId>com.tianjh</groupId>
    12     <artifactId>kafka-producer</artifactId>
    13     <version>0.0.1-SNAPSHOT</version>
    14     <name>kafka-producer</name>
    15     <description>Demo project for Spring Boot</description>
    16 
    17     <properties>
    18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    20         <java.version>1.8</java.version>
    21     </properties>
    22     <!--SpringBoot 启动相关的依赖-->
    23     <dependencies>
    24         <dependency>
    25             <groupId>org.springframework.boot</groupId>
    26             <artifactId>spring-boot-starter</artifactId>
    27         </dependency>
    28         <dependency>
    29             <groupId>org.springframework.boot</groupId>
    30             <artifactId>spring-boot-starter-web</artifactId>
    31         </dependency>
    32         <!--        日志文件依赖-->
    33         <dependency>
    34             <groupId>org.projectlombok</groupId>
    35             <artifactId>lombok</artifactId>
    36         </dependency>
    37         <!--        ## SpringBoot 整合 kafka核心依赖 ##-->
    38         <dependency>
    39             <groupId>org.springframework.kafka</groupId>
    40             <artifactId>spring-kafka</artifactId>
    41         </dependency>
    42 
    43         <!--        Springboot 单元测试-->
    44         <dependency>
    45             <groupId>org.springframework.boot</groupId>
    46             <artifactId>spring-boot-starter-test</artifactId>
    47             <scope>test</scope>
    48             <exclusions>
    49                 <exclusion>
    50                     <groupId>org.junit.vintage</groupId>
    51                     <artifactId>junit-vintage-engine</artifactId>
    52                 </exclusion>
    53             </exclusions>
    54         </dependency>
    55     </dependencies>
    56 
    57     <build>
    58         <plugins>
    59             <plugin>
    60                 <groupId>org.springframework.boot</groupId>
    61                 <artifactId>spring-boot-maven-plugin</artifactId>
    62             </plugin>
    63         </plugins>
    64     </build>
    65 
    66 </project>

    Springboot工程创建好之后会自动生成一个application.properties文件

     1 server.servlet.context-path=/producer
     2 server.port=8001
     3 
     4 ## Spring 整合 kafka  kafka的ip:port
     5 spring.kafka.bootstrap-servers=192.168.232.3:9092
     6 ## kafka producer 发送消息失败时的重试次数
     7 spring.kafka.producer.retries=0
     8 ## 批量发送数据的配置
     9 spring.kafka.producer.batch-size=16384
    10 ## 设置kafka 生产者内存缓存区的大小(32M)
    11 spring.kafka.producer.buffer-memory=33554432
    12 ## kafka消息的序列化配置
    13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    15 
    16 # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
    17 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
    18 # acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。
    19 # 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
    20 
    21 ##     这个是kafka生产端最核心的配置
    22 spring.kafka.producer.acks=1

    启动类Application就很简单了

     1 package com.tianjh.kafka;
     2 
     3 import org.springframework.boot.SpringApplication;
     4 import org.springframework.boot.autoconfigure.SpringBootApplication;
     5 
     6 @SpringBootApplication
     7 public class Application {
     8 
     9     public static void main(String[] args) {
    10         SpringApplication.run(Application.class, args);
    11     }
    12 
    13 }

    接下来就是最重要的kafka发送消息的方法 KafkaProducerService.java

     1 package com.tianjh.kafka.producer;
     2 
     3 import lombok.extern.slf4j.Slf4j;
     4 import org.springframework.beans.factory.annotation.Autowired;
     5 import org.springframework.kafka.core.KafkaTemplate;
     6 import org.springframework.kafka.support.SendResult;
     7 import org.springframework.stereotype.Component;
     8 import org.springframework.util.concurrent.ListenableFuture;
     9 import org.springframework.util.concurrent.ListenableFutureCallback;
    10 
    11 /**
    12  * $KafkaProducerService
    13  * @author tianjh
    14  * Component 注解交由Spring管理
    15  * Slf4j 引入日志
    16  */
    17 @Slf4j
    18 @Component
    19 public class KafkaProducerService {
    20 
    21     @Autowired
    22     private KafkaTemplate<String, Object> kafkaTemplate;
    23 
    24     /**
    25      * 生产端真正发消息的方法 直接调用kafkaTemplate.send
    26      * 方法进行发送 返回ListenableFuture<SendResult<K, V>>
    27      * 使用future.addCallback 创建ListenableFutureCallback<SendResult<String, Object>>()对象
    28      * 此处需要实现它的onSuccess、onFailure两个方法
    29      * @param topic  发到哪个topic上面
    30      * @param object 要发送的消息内容
    31      */
    32     public void sendMessage(String topic, Object object) {
    33         ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
    34         future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
    35             @Override
    36             public void onSuccess(SendResult<String, Object> result) {
    37                 log.info("Kafka-Producer发放消息成功:" + result.toString());
    38             }
    39 
    40             @Override
    41             public void onFailure(Throwable throwable) {
    42                 log.info("Kafka-Producer发放消息失败:" + throwable.getMessage());
    43             }
    44 
    45         });
    46     }
    47 
    48 }

     3、工程目录(消费端-consumer)

    工程目录结构和生成端差不多,可以直接拷贝生产端代码进行简单修改

     pom.xml文件

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
     4     <modelVersion>4.0.0</modelVersion>
     5     <parent>
     6         <groupId>org.springframework.boot</groupId>
     7         <artifactId>spring-boot-starter-parent</artifactId>
     8         <version>2.1.5.RELEASE</version>
     9         <relativePath/> <!-- lookup parent from repository -->
    10     </parent>
    11     <groupId>com.tianjh</groupId>
    12     <artifactId>kafka-consumer</artifactId>
    13     <version>0.0.1-SNAPSHOT</version>
    14     <name>kafka-consumer</name>
    15     <description>Demo project for Spring Boot</description>
    16 
    17     <properties>
    18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    20         <java.version>1.8</java.version>
    21     </properties>
    22 
    23     <dependencies>
    24         <dependency>
    25             <groupId>org.springframework.boot</groupId>
    26             <artifactId>spring-boot-starter</artifactId>
    27         </dependency>
    28         <dependency>
    29             <groupId>org.springframework.boot</groupId>
    30             <artifactId>spring-boot-starter-web</artifactId>
    31         </dependency>
    32         <dependency>
    33             <groupId>org.projectlombok</groupId>
    34             <artifactId>lombok</artifactId>
    35         </dependency>
    36         <dependency>
    37             <groupId>org.springframework.kafka</groupId>
    38             <artifactId>spring-kafka</artifactId>
    39         </dependency>
    40 
    41         <dependency>
    42             <groupId>org.springframework.boot</groupId>
    43             <artifactId>spring-boot-starter-test</artifactId>
    44             <scope>test</scope>
    45             <exclusions>
    46                 <exclusion>
    47                     <groupId>org.junit.vintage</groupId>
    48                     <artifactId>junit-vintage-engine</artifactId>
    49                 </exclusion>
    50             </exclusions>
    51         </dependency>
    52     </dependencies>
    53 
    54     <build>
    55         <plugins>
    56             <plugin>
    57                 <groupId>org.springframework.boot</groupId>
    58                 <artifactId>spring-boot-maven-plugin</artifactId>
    59             </plugin>
    60         </plugins>
    61     </build>
    62 
    63 </project>

    消费端的SpringBoot的配置文件application.properties

     1 server.servlet.context-path=/consumer
     2 server.port=8002
     3 
     4 spring.kafka.bootstrap-servers=192.168.232.3:9092
     5 
     6 ## consumer 消息的签收机制:手工签收 等于false 表示需要手工签收
     7 spring.kafka.consumer.enable-auto-commit=false
     8 spring.kafka.listener.ack-mode=manual
     9 # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
    10 # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
    11 # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
    12 spring.kafka.consumer.auto-offset-reset=earliest
    13 ## 序列化配置
    14 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    15 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    16 
    17 spring.kafka.listener.concurrency=5

    消费端消费消息的具体方法KafkaConsumerService

     1 package com.tianjh.kafka.consumer;
     2 
     3 import org.apache.kafka.clients.consumer.Consumer;
     4 import org.apache.kafka.clients.consumer.ConsumerRecord;
     5 import org.springframework.kafka.annotation.KafkaListener;
     6 import org.springframework.kafka.support.Acknowledgment;
     7 import org.springframework.stereotype.Component;
     8 
     9 import lombok.extern.slf4j.Slf4j;
    10 
    11 @Slf4j
    12 @Component
    13 public class KafkaConsumerService {
    14 
    15     @KafkaListener(groupId = "group02", topics = "topic02")
    16     public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    17         log.info("消费端接收消息: {}", record.value());
    18         //    手工签收机制
    19         acknowledgment.acknowledge();
    20     }
    21     
    22     
    23 }

    4、测试

    在生产端的src/test/java下新建一个Java测试类,我这儿新建的是ApplicationTests.java

     1 package com.tianjh.kafka.test;
     2 
     3 import com.tianjh.kafka.producer.KafkaProducerService;
     4 import org.junit.Test;
     5 import org.junit.runner.RunWith;
     6 import org.springframework.beans.factory.annotation.Autowired;
     7 import org.springframework.boot.test.context.SpringBootTest;
     8 import org.springframework.test.context.junit4.SpringRunner;
     9 
    10 /**
    11  * 测试发现消息
    12  */
    13 @RunWith(SpringRunner.class)
    14 @SpringBootTest
    15 public class ApplicationTests {
    16     /**
    17      * 依赖注入之前写好的发送
    18      * 消息的实现类,调用send方法进行发送
    19      */
    20     @Autowired
    21     private KafkaProducerService kafkaProducerService;
    22 
    23     @Test
    24     public void send() {
    25         try {
    26         String topic = "topic02";
    27         for (int i = 0; i < 10; i++) {
    28             kafkaProducerService.sendMessage(topic, "hello kafka" + i);
    29         }
    30             Thread.sleep(1000);
    31         } catch (InterruptedException e) {
    32             e.printStackTrace();
    33         }
    34     }
    35 }

    生产端运行之后控制台打印:

     说明消息发送成功,现在就看看消费端有没有进行消费消息,运行之后查看控制台发现收到了消息

     虽然消费端收到了消息但是并没有真正的被消费,因为在消费端消费消息的时候注释了手工签收的代码

    //     acknowledgment.acknowledge();

    通过kafka控制台查看 使用如下命令

     ./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group group02 

     现在把消费端注释掉的代码放开,确认手工签收 acknowledgment.acknowledge(); 接着再重新运行一下消费端项目

    再去kafka使用上诉命令进行查看,如下:

     

    这表明咋们发送的十条消息都被真正的进行消费了。

  • 相关阅读:
    LeetCode 275. H-Index II
    LeetCode 274. H-Index
    LeetCode Gray Code
    LeetCode 260. Single Number III
    LeetCode Word Pattern
    LeetCode Nim Game
    LeetCode 128. Longest Consecutive Sequence
    LeetCode 208. Implement Trie (Prefix Tree)
    LeetCode 130. Surrounded Regions
    LeetCode 200. Number of Islands
  • 原文地址:https://www.cnblogs.com/jhtian/p/13710423.html
Copyright © 2011-2022 走看看