一、环境
- Idea 2020.1
- JDK 1.8
- maven
二、目的
spring boot 通过整合kafka
gitHub地址: https://github.com/ouyushan/ouyushan-spring-boot-samples
三、步骤
3.1、点击File -> New Project -> Spring Initializer,点击next
3.2、在对应地方修改自己的项目信息
3.3、选择Web依赖,选中Spring Web、Spring Boot DevTools。可以选择Spring Boot版本,本次默认为2.3.0,点击Next
3.4、项目结构
四、添加文件
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.ouyushan</groupId>
<artifactId>spring-boot-mq-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-mq-kafka</name>
<description>Kafka project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties文件
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
SampleMessage.java
package org.ouyushan.springboot.mq.kafka.entity;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/9 14:02
*/
public class SampleMessage {
private final Integer id;
private final String message;
@JsonCreator
public SampleMessage(@JsonProperty("id") Integer id, @JsonProperty("message") String message) {
this.id = id;
this.message = message;
}
public Integer getId() {
return this.id;
}
public String getMessage() {
return this.message;
}
@Override
public String toString() {
return "SampleMessage{id=" + this.id + ", message='" + this.message + "'}";
}
}
KafkaConsumerListener.java
package org.ouyushan.springboot.mq.kafka.listener;
import org.ouyushan.springboot.mq.kafka.entity.SampleMessage;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/9 14:01
*/
@Component
public class KafkaConsumerListener {
@KafkaListener(topics = "testTopic")
public void processMessage(SampleMessage message) {
System.out.println("Received sample message [" + message + "]");
}
}
KafkaProducerController.java
package org.ouyushan.springboot.mq.kafka.controller;
import org.ouyushan.springboot.mq.kafka.entity.SampleMessage;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description:
* @Author: ouyushan
* @Email: ouyushan@hotmail.com
* @Date: 2020/6/9 14:07
*/
@RestController
public class KafkaProducerController {
private KafkaTemplate<Object, SampleMessage> kafkaTemplate;
public KafkaProducerController(KafkaTemplate<Object, SampleMessage> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@RequestMapping("/sendMsg")
public void sendMsg(SampleMessage message) {
this.kafkaTemplate.send("testTopic", message);
System.out.println("Sent sample message [" + message + "]");
}
}
五、测试
SpringBootMqRocketmqApplicationTests.java
package org.ouyushan.springboot.mq.kafka;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureWebMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest
@AutoConfigureWebMvc
class SpringBootMqKafkaApplicationTests {
@Test
public void testMQProducer(@Autowired MockMvc mockMvc) throws Exception {
mockMvc.perform(MockMvcRequestBuilders.get("/sendMsg?message=")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andDo(print());
}
}