zoukankan      html  css  js  c++  java
  • 【SpringBoot】SpringBoot 整合Kafka

      Kafka安装参考:Kafka安装(一)

    一、Kafka整合

    1、创建SpringBoot项目

      引入spring-kafka依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

      

      完整pom如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <parent>
     8         <groupId>org.springframework.boot</groupId>
     9         <artifactId>spring-boot-starter-parent</artifactId>
    10         <version>2.2.5.RELEASE</version>
    11     </parent>
    12 
    13     <groupId>org.example</groupId>
    14     <artifactId>test-springboot-kafka</artifactId>
    15     <version>1.0-SNAPSHOT</version>
    16 
    17     <properties>
    18         <maven.compiler.source>8</maven.compiler.source>
    19         <maven.compiler.target>8</maven.compiler.target>
    20     </properties>
    21 
    22     <dependencies>
    23         <dependency>
    24             <groupId>org.springframework.boot</groupId>
    25             <artifactId>spring-boot-starter-web</artifactId>
    26         </dependency>
    27 
    28         <dependency>
    29             <groupId>org.springframework.kafka</groupId>
    30             <artifactId>spring-kafka</artifactId>
    31         </dependency>
    32     </dependencies>
    33 
    34 
    35 </project>
    View Code

    2、application.yml配置kafka连接

     1 spring:
     2   kafka:
     3     bootstrap-servers: 172.101.203.33:9092
     4     producer:
     5       # 发生错误后,消息重发的次数。
     6       retries: 0
     7       #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
     8       batch-size: 16384
     9       # 设置生产者内存缓冲区的大小。
    10       buffer-memory: 33554432
    11       # 键的序列化方式
    12       key-serializer: org.apache.kafka.common.serialization.StringSerializer
    13       # 值的序列化方式
    14       value-serializer: org.apache.kafka.common.serialization.StringSerializer
    15       # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
    16       # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
    17       # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
    18       acks: 1
    19     consumer:
    20       # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
    21       auto-commit-interval: 1S
    22       # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
    23       # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
    24       # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
    25       auto-offset-reset: earliest
    26       # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
    27       enable-auto-commit: false
    28       # 键的反序列化方式
    29       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    30       # 值的反序列化方式
    31       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    32     listener:
    33       # 在侦听器容器中运行的线程数。
    34       concurrency: 5
    35       #listner负责ack,每调用一次,就立即commit
    36       ack-mode: manual_immediate
    37       missing-topics-fatal: false

    3、kafka生产者

     1 @RestController
     2 public class KafkaController {
     3 
     4     @Autowired
     5     private KafkaTemplate<String, Object> kafkaTemplate;
     6 
     7     //自定义topic
     8     public static final String TOPIC_TEST = "topic-test";
     9 
    10     @RequestMapping("/sendMsg")
    11     public void sendMsg(String msg) throws JsonProcessingException {
    12 
    13         System.out.println("准备发送消息为:" + msg);
    14         //发送消息
    15         ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, msg);
    16         future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
    17             @Override
    18             public void onFailure(Throwable throwable) {
    19                 //发送失败的处理
    20                 System.out.println(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
    21             }
    22 
    23             @Override
    24             public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
    25                 //成功的处理
    26                 System.out.println(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
    27             }
    28         });
    29     }
    30 } 

    4、kafka消费者

     1 @Component
     2 public class KafkaConsumer {
     3 
     4     //自定义topic
     5     public static final String TOPIC_TEST = "topic-test";
     6 
     7     //
     8     public static final String TOPIC_GROUP1 = "topic-group1";
     9 
    10     //
    11     public static final String TOPIC_GROUP2 = "topic-group2";
    12 
    13 
    14     @KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP1)
    15     public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    16 
    17         Optional message = Optional.ofNullable(record.value());
    18         if (message.isPresent()) {
    19             Object msg = message.get();
    20             System.out.println("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
    21             ack.acknowledge();
    22         }
    23     }
    24 
    25     @KafkaListener(topics = TOPIC_TEST, groupId = TOPIC_GROUP2)
    26     public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    27 
    28         Optional message = Optional.ofNullable(record.value());
    29         if (message.isPresent()) {
    30             Object msg = message.get();
    31             System.out.println("topic_test1 消费了: Topic:" + topic + ",Message:" + msg);
    32             ack.acknowledge();
    33         }
    34     }
    35 } 

     5、启动类

    1 @SpringBootApplication
    2 public class Application {
    3     public static void main(String[] args) {
    4         SpringApplication.run(Application.class);
    5     }
    6 }

    6、测试运行

      1、运行SpringBoot项目

      2、使用地址:http://localhost:8080/sendMsg?msg=abc,访问服务器

      3、结果如下:

    准备发送消息为:abc
    topic-test - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=topic-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=abc, timestamp=null), recordMetadata=topic-test-0@0]
    topic_test 消费了: Topic:topic-test,Message:abc
    topic_test1 消费了: Topic:topic-test,Message:abc

    二、Kafka自动配置原理

     1 @Configuration(proxyBeanMethods = false)
     2 @ConditionalOnClass(KafkaTemplate.class)
     3 @EnableConfigurationProperties(KafkaProperties.class)
     4 @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
     5 public class KafkaAutoConfiguration {
     6 
     7     private final KafkaProperties properties;
     8 
     9     public KafkaAutoConfiguration(KafkaProperties properties) {
    10         this.properties = properties;
    11     }
    12 
    13     @Bean
    14     @ConditionalOnMissingBean(KafkaTemplate.class)
    15     public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
    16             ProducerListener<Object, Object> kafkaProducerListener,
    17             ObjectProvider<RecordMessageConverter> messageConverter) {
    18         KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
    19         messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
    20         kafkaTemplate.setProducerListener(kafkaProducerListener);
    21         kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
    22         return kafkaTemplate;
    23     }
    24 
    25     @Bean
    26     @ConditionalOnMissingBean(ProducerListener.class)
    27     public ProducerListener<Object, Object> kafkaProducerListener() {
    28         return new LoggingProducerListener<>();
    29     }
    30 
    31     @Bean
    32     @ConditionalOnMissingBean(ConsumerFactory.class)
    33     public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    34         return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
    35     }
    36 
    37     @Bean
    38     @ConditionalOnMissingBean(ProducerFactory.class)
    39     public ProducerFactory<?, ?> kafkaProducerFactory() {
    40         DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
    41                 this.properties.buildProducerProperties());
    42         String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
    43         if (transactionIdPrefix != null) {
    44             factory.setTransactionIdPrefix(transactionIdPrefix);
    45         }
    46         return factory;
    47     }
    48 
    49     @Bean
    50     @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
    51     @ConditionalOnMissingBean
    52     public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
    53         return new KafkaTransactionManager<>(producerFactory);
    54     }
    55 
    56     @Bean
    57     @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
    58     @ConditionalOnMissingBean
    59     public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
    60         KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
    61         Jaas jaasProperties = this.properties.getJaas();
    62         if (jaasProperties.getControlFlag() != null) {
    63             jaas.setControlFlag(jaasProperties.getControlFlag());
    64         }
    65         if (jaasProperties.getLoginModule() != null) {
    66             jaas.setLoginModule(jaasProperties.getLoginModule());
    67         }
    68         jaas.setOptions(jaasProperties.getOptions());
    69         return jaas;
    70     }
    71 
    72     @Bean
    73     @ConditionalOnMissingBean
    74     public KafkaAdmin kafkaAdmin() {
    75         KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
    76         kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
    77         return kafkaAdmin;
    78     }
    79 
    80 }

    参考:https://www.jianshu.com/p/6ce5d9a96113?utm_campaign=hugo

  • 相关阅读:
    BZOJ1880: [Sdoi2009]Elaxia的路线(最短路)
    「BZOJ1433」[ZJOI2009] 假期的宿舍(二分图,网络流)
    BZOJ 1061 [Noi2008]志愿者招募(费用流)
    [BZOJ2879][Noi2012]美食节(费用流)
    bzoj 1834 [ZJOI2010] network 网络扩容(费用流)
    BZOJ2668:[CQOI2012]交换棋子(费用流)
    bzoj1070【SCOI2007】修车(费用流)
    【BZOJ 1877】 [SDOI2009]晨跑(费用流)
    SQLite_Home
    GeoMesa-单机搭建
  • 原文地址:https://www.cnblogs.com/h--d/p/14875362.html
Copyright © 2011-2022 走看看