helloworld
依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
配置文件application.properties
spring.kafka.bootstrap-servers=192.168.1.51:9092
spring.kafka.consumer.group-id=myGroup
测试:
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaBootController {
private static final String TOPIC = "wj";
private final KafkaTemplate kafkaTemplate;
@Autowired
public KafkaBootController(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
//消息发送
@GetMapping("/send")
public String send(){
kafkaTemplate.send(TOPIC,"hello boot");
return "success";
}
//消息监听
@KafkaListener(topics = TOPIC)
public void listener(String content){
log.info(content);
}
}
executeInTransaction事务
开启事务支持:application.properties
spring.kafka.producer.transaction-id-prefix=kafka_tx.
@GetMapping("/send/{input}")
public String send(@PathVariable String input){
kafkaTemplate.executeInTransaction(t->{
t.send(TOPIC,"hello boot");
if("tx".equals(input)){
throw new RuntimeException("异常");
}
t.send(TOPIC,"hello boot");
return true;
});
return "success";
}
@KafkaListener(topics = TOPIC)
public void listener(String content){
log.info(content);
}
访问:http://localhost:8080/kafka/send/tx
出现图中所示红框内容,则事务控制成功。
注解事务
@GetMapping("/send2/{input}")
@Transactional(rollbackFor = RuntimeException.class)
public String send2(@PathVariable String input){
kafkaTemplate.send(TOPIC,"hello boot");
if("tx".equals(input)){
throw new RuntimeException("异常");
}
kafkaTemplate.send(TOPIC,"hello boot");
return "success";
}