zoukankan      html  css  js  c++  java
  • SpringBoot:整合Kafka

    helloworld

    依赖:

        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.13</artifactId>
                <version>2.6.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-core</artifactId>
                <version>2.11.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.11.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-annotations</artifactId>
                <version>2.11.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.58</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
        </dependencies>
    

    配置文件application.properties

    spring.kafka.bootstrap-servers=192.168.1.51:9092
    spring.kafka.consumer.group-id=myGroup
    

    测试:

    @Slf4j
    @RestController
    @RequestMapping("/kafka")
    public class KafkaBootController {
    
        private static final String TOPIC = "wj";
    
        private final KafkaTemplate kafkaTemplate;
    
        @Autowired
        public KafkaBootController(KafkaTemplate kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
         //消息发送
        @GetMapping("/send")
        public String send(){
            kafkaTemplate.send(TOPIC,"hello boot");
            return "success";
        }
        //消息监听
        @KafkaListener(topics = TOPIC)
        public void listener(String content){
            log.info(content);
        }
    }
    

    image-20210219171446508

    executeInTransaction事务

    开启事务支持:application.properties

    spring.kafka.producer.transaction-id-prefix=kafka_tx.
    
    @GetMapping("/send/{input}")
    public String send(@PathVariable String input){
        kafkaTemplate.executeInTransaction(t->{
            t.send(TOPIC,"hello boot");
            if("tx".equals(input)){
                throw new RuntimeException("异常");
            }
            t.send(TOPIC,"hello boot");
            return true;
        });
        return "success";
    }
    
    @KafkaListener(topics = TOPIC)
    public void listener(String content){
        log.info(content);
    }
    

    访问:http://localhost:8080/kafka/send/tx

    image-20210219172828958

    出现图中所示红框内容,则事务控制成功。

    注解事务

    @GetMapping("/send2/{input}")
    @Transactional(rollbackFor = RuntimeException.class)
    public String send2(@PathVariable String input){
        kafkaTemplate.send(TOPIC,"hello boot");
        if("tx".equals(input)){
            throw new RuntimeException("异常");
        }
        kafkaTemplate.send(TOPIC,"hello boot");
        return "success";
    }
    

    访问:http://localhost:8080/kafka/send2/tx

    image-20210219173809462

  • 相关阅读:
    火狐常用的插件
    sourceinsight技巧
    为sourceinsight添加makefile、kconfig、*.S文件支持
    如何在shell中打印出带颜色的字符?
    Linux shell tee指令学习
    【转载】dirs、pushd、popd指令
    【转载】SHELL字符串处理技巧(${}、##、%%)
    【转载】利用shell脚本获取一个文件的绝对路径readlink
    如何查看智能手机的IP地址
    SDK Manager中勾选项
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/14417291.html
Copyright © 2011-2022 走看看