zoukankan      html  css  js  c++  java
  • kafka学习(五)Spring Boot 整合 Kafka

    文章更新时间:2020/06/08

    一、创建Spring boot 工程

    创建过程不再描述,创建后的工程结构如下:

    POM文件中要加入几个依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.zhbf</groupId>
        <artifactId>springboot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>springboot</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!--引入kafka依赖-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <!-- 添加 gson 依赖 -->
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.5</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    启动SpringbootApplication.java,出现下图界面则说明工程创建好了:

    二、创建kafka生产者类,并通过控制器调用

    kafka生产者类

    /**
     * Kafka消息生产类
     */
    @Log
    @Component
    public class KafkaProducer {
    
        @Resource
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Value("${kafka.topic.user}")
        private String topicUser;//topic名称
    
        /**
         * 发送用户消息
         *
         * @param user 用户信息
         */
        public void sendUserMessage(User user) {
            GsonBuilder builder = new GsonBuilder();
            builder.setPrettyPrinting();
            builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
            String message = builder.create().toJson(user);
            kafkaTemplate.send(topicUser, message);
            log.info("
    生产消息至Kafka
    " + message);
        }
    }

    application.yml配置文件

    启动ZK、kafka通讯的服务器broker,并启动消费者监听

      启动方式参考上一篇文章,戳这里~

    配置一个控制器,即调用kafka生成消息的入口

    /**
     * 测试控制器
     * PS:@RestController 注解: 该注解是 @Controller 和 @ResponseBody 注解的合体版
     */
    @RestController
    @RequestMapping("/kafka")
    public class KafkaController {
    
        @Autowired
        private User user;
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
        @RequestMapping("/createMsg")
        public void createMsg() {
            kafkaProducer.sendUserMessage(user);
        }
    }

    启动SpringbootApplication,并通过浏览器访问控制器,生成消息

     

     

    可以看到控制台和消费者窗口都打印了kafka生成的消息。

    三、创建kafka消费者类,并通过控制器调用

     kafka消费者类

    public class KafkaConsumerDemo {
    
        @Value("${kafka.topic.user}")
        private String topicUser;//topic名称
    
        public void consume() {
            Properties props = new Properties();
    
            // 必须设置的属性
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("group.id", "group-user");
    
            // 可选设置属性
    
            //提交方式配置
            // 自动提交offset,每1s提交一次(提交后的消息不再消费,避免重复消费问题)
            props.put("enable.auto.commit", "true");//自动提交offset:true【PS:只有当消息提交后,此消息才不会被再次接受到】
            props.put("auto.commit.interval.ms", "1000");//自动提交的间隔
    
            //消费方式配置
            /**
             * earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
             * latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
             * none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
             */
            props.put("auto.offset.reset", "earliest ");//earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
    
            //拉取消息设置
            props.put("max.poll.records", "100 ");//每次poll操作最多拉取多少条消息(一般不主动设置,取默认的就好)
    
            //根据上面的配置,新增消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 订阅topic-user topic
            consumer.subscribe(Collections.singletonList(topicUser));
    
            while (true) {
                //  从服务器开始拉取数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
                });
            }
        }
    }

    重启SpringbootApplication,并通过浏览器访问控制器,消费消息

     

    好文推荐:如何优雅的使用kafka consumer

  • 相关阅读:
    〖Linux〗转换Socks Proxy为Http Proxy
    〖Linux〗Linux的smb地址转换Windows格式(两者互转)
    〖前端开发〗HTML/CSS基础知识学习笔记
    精确光源(Punctual Light Sources)
    面元间的能量传输
    pbr若干概念
    c# xml 输出注释格式控制
    unity, 立即生效动画:Animation.sample()
    unity, 在材质上指定render queue
    unity, shader, Tags的位置
  • 原文地址:https://www.cnblogs.com/riches/p/11720068.html
Copyright © 2011-2022 走看看