zoukankan      html  css  js  c++  java
  • Spring Boot Sample 029之spring-boot-mq-kafka

    一、环境

    • Idea 2020.1
    • JDK 1.8
    • maven

    二、目的

    spring boot 通过整合kafka

    gitHub地址: https://github.com/ouyushan/ouyushan-spring-boot-samples

    三、步骤

    3.1、点击File -> New Project -> Spring Initializer,点击next

    3.2、在对应地方修改自己的项目信息

    3.3、选择Web依赖,选中Spring Web、Spring Boot DevTools。可以选择Spring Boot版本,本次默认为2.3.0,点击Next

    3.4、项目结构

    四、添加文件

    pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.0.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>org.ouyushan</groupId>
        <artifactId>spring-boot-mq-kafka</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>spring-boot-mq-kafka</name>
        <description>Kafka project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    application.properties文件

    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=testGroup
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
    spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka
    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    

    SampleMessage.java

    package org.ouyushan.springboot.mq.kafka.entity;
    
    import com.fasterxml.jackson.annotation.JsonCreator;
    import com.fasterxml.jackson.annotation.JsonProperty;
    
    /**
     * @Description:
     * @Author: ouyushan
     * @Email: ouyushan@hotmail.com
     * @Date: 2020/6/9 14:02
     */
    public class SampleMessage {
    
        private final Integer id;
    
        private final String message;
    
        @JsonCreator
        public SampleMessage(@JsonProperty("id") Integer id, @JsonProperty("message") String message) {
            this.id = id;
            this.message = message;
        }
    
        public Integer getId() {
            return this.id;
        }
    
        public String getMessage() {
            return this.message;
        }
    
        @Override
        public String toString() {
            return "SampleMessage{id=" + this.id + ", message='" + this.message + "'}";
        }
    
    }
    

    KafkaConsumerListener.java

    package org.ouyushan.springboot.mq.kafka.listener;
    
    import org.ouyushan.springboot.mq.kafka.entity.SampleMessage;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @Description:
     * @Author: ouyushan
     * @Email: ouyushan@hotmail.com
     * @Date: 2020/6/9 14:01
     */
    @Component
    public class KafkaConsumerListener {
    
        @KafkaListener(topics = "testTopic")
        public void processMessage(SampleMessage message) {
            System.out.println("Received sample message [" + message + "]");
        }
    }
    

    KafkaProducerController.java

    package org.ouyushan.springboot.mq.kafka.controller;
    
    import org.ouyushan.springboot.mq.kafka.entity.SampleMessage;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @Description:
     * @Author: ouyushan
     * @Email: ouyushan@hotmail.com
     * @Date: 2020/6/9 14:07
     */
    @RestController
    public class KafkaProducerController {
    
    
        private KafkaTemplate<Object, SampleMessage> kafkaTemplate;
    
        public KafkaProducerController(KafkaTemplate<Object, SampleMessage> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        @RequestMapping("/sendMsg")
        public void sendMsg(SampleMessage message) {
            this.kafkaTemplate.send("testTopic", message);
            System.out.println("Sent sample message [" + message + "]");
        }
    
    }
    

    五、测试

    SpringBootMqRocketmqApplicationTests.java

    package org.ouyushan.springboot.mq.kafka;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureWebMvc;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.http.MediaType;
    import org.springframework.test.web.servlet.MockMvc;
    import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
    
    import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
    import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
    
    @SpringBootTest
    @AutoConfigureWebMvc
    class SpringBootMqKafkaApplicationTests {
    
        @Test
        public void testMQProducer(@Autowired MockMvc mockMvc) throws Exception {
            mockMvc.perform(MockMvcRequestBuilders.get("/sendMsg?message=")
                    .accept(MediaType.APPLICATION_JSON))
                    .andExpect(status().isOk())
                    .andDo(print());
        }
    
    }
    
  • 相关阅读:
    021.day21 反射 Class类 反射常用操作
    020.day20 线程概述 多线程优缺点 线程的创建 线程常用方法 生命周期 多线程同步
    019.day19 缓冲流 对象流 标准输入输出流
    018.day18 map集合如何实现排序 File类 IO流 字节流 字符流 编码
    017.day17 Map接口 克隆 treeSet集合排重缺陷
    016.day16 HashSet TreeSet 比较器Comparable Comparator
    015.day15
    014.day14
    013.day13
    线程
  • 原文地址:https://www.cnblogs.com/ouyushan/p/13976653.html
Copyright © 2011-2022 走看看