zoukankan      html  css  js  c++  java
  • Kafka:Springboot整合Kafka消息队列

    本文主要分享下Spring Boot和Spring Kafka如何配置整合,实现发送和接收来自Spring Kafka的消息。

    项目结构

    pom依赖包

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.java.zy</groupId>
        <artifactId>base-kafka</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.2.RELEASE</version>
            <relativePath/>
        </parent>
    
        <dependencies>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!--引入kafak和spring整合的jar-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.2.8.RELEASE</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <version>2.2.8.RELEASE</version>
                <scope>test</scope>
            </dependency>
    
        </dependencies>
    
    </project>

    Springboot+kafka对应表

     JAVA代码

    定义数据传输对象

    package com.java.common.entity;
    
    import org.springframework.stereotype.Component;
    
    @Component
    public class UserLog {
    
        private String username;
        private String userid;
        private String state;
    
        public String getUsername() {
            return username;
        }
    
        public void setUsername(String username) {
            this.username = username;
        }
    
        public String getUserid() {
            return userid;
        }
    
        public void setUserid(String userid) {
            this.userid = userid;
        }
    
        public String getState() {
            return state;
        }
    
        public void setState(String state) {
            this.state = state;
        }
    }

    定义发送信息实体 ( 消息的发送直接使用KafkaTemplate模板即可,都封装好了,直接使用 )

    package com.java.common.controller;
    
    import com.alibaba.fastjson.JSON;
    import com.java.common.entity.UserLog;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class UserLogProducer {
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 发送数据
         * @param userid
         */
        public void sendLog(String userid){
            UserLog userLog = new UserLog();
            userLog.setUsername("jhp");
            userLog.setUserid(userid);
            userLog.setState("0");
            System.err.println("发送用户日志数据:"+userLog);
            kafkaTemplate.send("user-log", JSON.toJSONString(userLog));
        }
    
    }

    获取消息的消费者实体

    注意消费机制是通过监听器实现的,直接使用@KafkaListener(topics = {"user-log"})注解,根据指定的条件进行消息的监听

    package com.java.common.controller;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    public class UserLogConsumer {
    
        @KafkaListener(topics = {"user-log"})
        public void consumer(ConsumerRecord<?,?> consumerRecord){
            //判断是否为null
            Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
            System.out.println(">>>>>>>>>> record = " + kafkaMessage);
            if(kafkaMessage.isPresent()){
                //得到Optional实例中的值
                Object message = kafkaMessage.get();
                System.err.println("消费消息:"+message);
            }
        }
    
    }

    Springboot应用启动类(方便测试,在容器初始化的就开始模拟消息的发送)

    package com.java.common;
    
    import com.java.common.controller.UserLogProducer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import javax.annotation.PostConstruct;
    
    @SpringBootApplication
    public class KafkaApplication {
    
        @Autowired
        private UserLogProducer kafkaSender;
        @PostConstruct
        public void init(){
            for (int i = 0; i < 10; i++) {
                //调用消息发送类中的消息发送方法
                kafkaSender.sendLog(String.valueOf(i));
            }
        }
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class,args);
        }
    
    }

    对应的application.properties配置文件

    spring.application.name=base-kafka
    server.port=8080
    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=localhost:9092,192.168.100.1:9093
     
    #=============== provider  =======================
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
     
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=user-log-group
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    对应的application.yml配置文件

    server:
      port: 8080
    
    spring:
      application:
        name: base-kafka
      #============== kafka ===================
      # 指定kafka 代理地址,可以多个
      kafka:
        bootstrap-servers: 192.168.200.27:19092
    
        #=============== provider  =======================
        producer:
          retries: 0
          # 每次批量发送消息的数量
          batch-size: 16384
          buffer-memory: 33554432
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
        #=============== consumer  =======================
        consumer:
          # 指定默认消费者group id(消息组)
          group-id: user-log
          auto-offset-reset: earliest
          enable-auto-commit: true
          auto-commit-interval: 100
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer

    测试结果

    启动项目,观察控制台打印的消息

    springboot和kafka成功整合!

    文章转载至:https://blog.csdn.net/qq_18603599/article/details/81169488

  • 相关阅读:
    vs2008生成的各种文件
    spider_keeper
    scrapyd 参考(https://www.jianshu.com/p/2a189127901a)
    Django之ORM操作
    ES6 快速入门
    vue 数据(data)赋值问题
    vue嵌套路由-query传递参数(三)
    Vue页面加载时,触发某个函数的方法
    vue从一个页面跳转到另一个页面并携带参数
    单选框radio总结(获取值、设置默认选中值、样式)
  • 原文地址:https://www.cnblogs.com/nhdlb/p/13933618.html
Copyright © 2011-2022 走看看