zoukankan      html  css  js  c++  java
  • springcloudstream配置使用kafka案例

    一、Spring cloud stream概述

    Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

    二、引入包依赖

            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.6.4</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            </dependency>
    

      

    三、自定义信息通道

    官方提供了Sink(输入通道)、Source(输出通道)、Processor(集成Sink和Source通道),我们也可以自定义我们自己的信息通道。
    @Input注解标识一个输入通道
    @Output注解标识一个输出通道
    通道名称作为参数,如果未提供参数,默认使用方法名称作为通道名称。
    如下我们自定义信息通道ExamFinishChannel
    public interface ExamFinishChannel {
        String EXAM_FINISH_OUTPUT = "exam-finish-output";
        String EXAM_FINISH_INPUT = "exam-finish-input";
    
        @Output(EXAM_FINISH_OUTPUT)
        MessageChannel sendExamFinishEvent();
    
        @Input(EXAM_FINISH_INPUT)
        SubscribableChannel receiveExamFinishEvent();
    }
    

      

    四、SpringCloudStream及kafka配置

    spring:
        cloud:
            stream:
                kafka:
                    binder:
                        brokers: ${kafka.brokers:127.0.0.1:9092}
                bindings:
                    exam-finish-output:
                        destination: ${kafka.exam-finish-event:spacer_tiangong_exam_finish_event_dev}
                        content-type: application/json
                    exam-finish-input:
                        destination: ${kafka.exam-finish-event:spacer_tiangong_exam_finish_event_dev}
                        content-type: application/json
                        group: ${kafka.exam-finish-consumer-group:spacer_tiangong_exam_group}
    

      

    从上面配置可以看出
    1、定义了通道名称及分组,binder代表绑定实现的标识名称(如kafka或者rabbit),与3中的定义名称相对应。
    2、定义了入站消费者的并发性,指在一个实例内的并发性,不同实例之间本身就是并发的,默认值为1
    spring.cloud.stream.bindings.<channelName>.consumer.concurrency=1
    3、定义了kafka连接信息
    如果未配置autoCommitOffset,默认自动提交偏移量
    详细参数配置可参考官网

    五、发送消息到输出通道

    @Slf4j
    @EnableBinding(ExamFinishChannel.class)
    public class ExamFinishProducer {
    
        @Autowired
        private ExamFinishChannel examFinishChannel;
    
        public void sendExamFinishEventMsg(ExamFinishEventMessage payload) {
            examFinishChannel.sendExamFinishEvent().send(MessageBuilder.withPayload(payload).build());
            log.info("send msg success: {}", payload);
        }
    }
    

      

    注入先前定义的通道ExamFinishChannel自定义的发送方法,可将消息发送到通道中,每个通道对应一个kafka的主题
     

    六、从输入通道订阅消息

    @Slf4j
    @EnableBinding(ExamFinishChannel.class)
    public class ExamFinishConsumer {
    
        @StreamListener(ExamFinishChannel.EXAM_FINISH_INPUT)
        public void receive(ExamFinishEventMessage payload) {
            log.info("start....");
            System.out.println(payload);
            try {
                Thread.sleep(1000*10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("receive success end:{}", payload);
        }
    }
    

      

    七、这样完整的消息系统就搭建好了,定义Controller发送消息测试

     @Autowired
        private ExamFinishProducer examFinishProducer;
    
        @GetMapping("/test")
        public void test() {
            ExamFinishEventMessage examFinishEventMessage = new ExamFinishEventMessage();
            examFinishEventMessage.setName("mzq");
            examFinishProducer.sendExamFinishEventMsg(examFinishEventMessage);
            examFinishProducer.sendExamFinishEventMsg(examFinishEventMessage);
            examFinishProducer.sendExamFinishEventMsg(examFinishEventMessage);
            examFinishProducer.sendExamFinishEventMsg(examFinishEventMessage);
        }
    

      

    八、并发性测试

    如七中所示,一次发送4条消息到缺省消息通道中,
    在并发性配置为1的情况下,即spring.cloud.stream.bindings.exam-finish-input.consumer.concurrency=1
     
    start....
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    start....
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    start....
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    start....
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    

      

    如果将concurrency修改为2,即spring.cloud.stream.bindings.exam-finish-input.consumer.concurrency=2

    start...
    start...
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    start...
    ExamFinishEventMessage(name=mzq)
    start...
    ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    receive success end:ExamFinishEventMessage(name=mzq)
    

    从日志可以看出,实现了两个线程的并发消费。

    作者:森林木马

    -------------------------------------------

    特此声明:所有评论和私信都会在第一时间回复。也欢迎朋友们指正错误,共同进步!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    个性签名:好记性不如勤随笔,好随笔还请多关注!

  • 相关阅读:
    BestCoder17 1001.Chessboard(hdu 5100) 解题报告
    codeforces 485A.Factory 解题报告
    codeforces 485B Valuable Resources 解题报告
    BestCoder16 1002.Revenge of LIS II(hdu 5087) 解题报告
    codeforces 374A Inna and Pink Pony 解题报告
    codeforces 483B Friends and Presents 解题报告
    BestCoder15 1002.Instruction(hdu 5083) 解题报告
    codeforces 483C.Diverse Permutation 解题报告
    codeforces 483A. Counterexample 解题报告
    NSArray中地内存管理 理解
  • 原文地址:https://www.cnblogs.com/owenma/p/15463237.html
Copyright © 2011-2022 走看看