zoukankan      html  css  js  c++  java
  • Spring Cloud Stream学习笔记

    1 环境

    2 简介

    Spring Cloud Stream是一个用于构建消息驱动的微服务应用的框架,其提供的一系列抽象屏蔽了不同类型消息中间件使用上的差异,同时也大大简化了Spring在整合消息中间件时的使用复杂度。

    Spring Cloud Stream 提供了Binder(负责与消息中间件进行交互)

    3 初见

    1 创建项目 添加web rabbitmq stream依赖

    在这里插入图片描述

    2 rabbitmq配置

    # 其他参数默认配置
    spring.rabbitmq.host=你的host
    

    3 消息接收器

    // 该注解表示绑定Sink消息通道
    @EnableBinding(Sink.class)
    public class MsgReceiver {
    
        private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
    
        // 自带 消费者
        @StreamListener(Sink.INPUT)
        public void receive(Object payload){
            logger.info("received: " + payload);
        }
    
    }
    

    4 在rabbitmq中发送消息

    在这里插入图片描述

    5 查看结果

    在这里插入图片描述

    4 自定义消息通道

    1 自定义接口

    
    public interface MyChannel {
        String INPUT = "test-input";
        String OUTPUT = "test-output";
    
        // 收
        @Input(INPUT)
        SubscribableChannel input();
    
        // 发
        @Output(OUTPUT)
        MessageChannel output();
    }
    

    2 自定义接收器

    // 绑定自定义消息通道
    @EnableBinding(MyChannel.class)
    public class MsgReceiver1 {
    
        private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);
    
        // 收
        @StreamListener(MyChannel.INPUT)
        public void receive(Object payload){
            logger.info("received1: " + payload + ":" + new Date());
        }
    
    }
    

    3 controller进行测试

    package com.sundown.stream.controller;
    
    import com.sundown.stream.bean.ChatMessage;
    import com.sundown.stream.msg.MyChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.GenericMessage;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    @RestController
    public class HelloController {
    
        @Autowired
        MyChannel myChannel;
    
        @GetMapping("/hello")
        public void hello(){
            String message = "welcome spring cloud stream";
     myChannel.output().send(MessageBuilder.withPayload(message).build());
        }
    }
    
    

    4 消息输入输出(通道对接)

    spring.cloud.stream.bindings.test-input.destination=test-topic
    spring.cloud.stream.bindings.test-output.destination=test-topic
    

    5 启动、访问

    在这里插入图片描述
    在这里插入图片描述

    5 消息分组

    • 消息分组(肥水不留外人田 你可能不知道流向哪家田 但是确实是自己人)

    1 打包 访问(未使用消息分组)

    在这里插入图片描述

    • 启动java -jar stream-0.0.1-SNAPSHOT.jar
      java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081运行访问http://localhost:8080/hello
    • 结果(多次消费)
      在这里插入图片描述在这里插入图片描述
    • 现在我不想一条消息被多次消费(假设消费者是一个集群 --> 多个人做同一件事 题外话:分布式 --> 一件事分给多个人做) 是否有什么办法呢
      消息分组帮我们解决(指定输入 输出 有么有负载均衡的味道)

    2 消息分组配置

    spring.cloud.stream.bindings.test-input.destination=test-topic
    spring.cloud.stream.bindings.test-output.destination=test-topic
    
    spring.cloud.stream.bindings.test-input.group=gg
    spring.cloud.stream.bindings.test-output.group=gg
    
    • 为了验证是否能成功 重新打包运行 和上面一样 访问接口
      在这里插入图片描述
      在这里插入图片描述
    • 清空2个控制台的信息 再次访问接口
      在这里插入图片描述
      在这里插入图片描述

    6 消息分区

    • 为一些具有相同特征的消息设置每次都被同一个消费实例进行消费。

    1 消息分区配置

    • properties配置
    spring.cloud.stream.bindings.test-input.destination=test-topic
    spring.cloud.stream.bindings.test-output.destination=test-topic
    
    spring.cloud.stream.bindings.test-input.group=gg
    spring.cloud.stream.bindings.test-output.group=gg
    
    # 开启消费分区(消费者上配置)
    spring.cloud.stream.bindings.test-input.consumer.partitioned=true
    # 消费者实例个数(消费者上配置)
    spring.cloud.stream.instance-count=2
    # 当前实例下标(消费者上配置)
    spring.cloud.stream.instance-index=0
    

    2 controller配置

    @RestController
    public class HelloController {
    
        @Autowired
        MyChannel myChannel;
    
        @GetMapping("/hello")
        public void hello(){
            String message = "welcome spring cloud stream";
            // 先写死
            int whichPart = 1;
            System.out.println("发送消息:" + message + ",发往分区:" + whichPart);
            myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
        }
    }
    

    3 访问

    • 打包运行java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
      java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0(别忘了先关闭启动类 不然打包会报错)
      -访问http://localhost:8080/hello
      在这里插入图片描述

    4 若是随机访问呢

    @GetMapping("/hello")
        public void hello(){
            String message = "welcome spring cloud stream";
            int whichPart = new Random().nextInt(2);
            System.out.println("发送消息:" + message + ",发往分区:" + whichPart);
            myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build());
        }
    
    • 和上面一样打包访问
      在这里插入图片描述

    7 定时器

    虽然定时任务可以用cron表达式 但是对于一些特殊的定时任务 可以使用stream+rabbitmq更合适 比如几分钟后执行
    rabbitmq插件安装

    1 配置

    • properties
    spring.rabbitmq.host=xxx
    
    spring.cloud.stream.bindings.test-input.destination=topic
    spring.cloud.stream.bindings.test-output.destination=topic
    
    spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true
    spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true
    
    #spring.cloud.stream.bindings.test-input.destination=test-topic
    #spring.cloud.stream.bindings.test-output.destination=test-topic
    #
    #spring.cloud.stream.bindings.test-input.group=gg
    #spring.cloud.stream.bindings.test-output.group=gg
    #
    ## 开启消费分区(消费者上配置)
    #spring.cloud.stream.bindings.test-input.consumer.partitioned=true
    ## 消费者实例个数(消费者上配置)
    #spring.cloud.stream.instance-count=2
    ## 当前实例下标(消费者上配置)
    #spring.cloud.stream.instance-index=0
    #
    ## 生产者配置
    #spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart']
    ## 消费节点数量
    #spring.cloud.stream.bindings.test-output.producer.partition-count=2
    
    
    • 自定义通道
    // 绑定自定义消息通道
    @EnableBinding(MyChannel.class)
    public class MsgReceiver1 {
    
        private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class);
    
        // 收
        @StreamListener(MyChannel.INPUT)
        public void receive(Object payload){
            // 添加日期 一会好对比
            logger.info("received1: " + payload + ":" + new Date());
        }
    
    }
    
    • controller
    @RestController
    public class HelloController {
        private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
    
        @Autowired
        MyChannel myChannel;
    
        @GetMapping("/delay")
        public void delay(){
            String message = "welcome spring cloud stream";
            logger.info("send msg:" + new Date());
            // x-delay --> 延迟3s
            myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build());
        }
    }
    

    2 启动 访问

    在这里插入图片描述

    • 打开rabbitmq查看
      在这里插入图片描述
    • 查看idea控制台
      在这里插入图片描述

    8 小结

    stream自带的与自定义(添加destination=xxx)之间的类似和区别
    解决重复消费 分组(group)
    消息分组单个实例访问(开启消费分区 实例个数 实例下标 生产者配置 消费节点数)
    定时器 rabbitmq相关的插件安装运行 后端代码实现(配置delayed-exchange和destination以及controller 发送时添加setHeader("x-delay", 3000) 3s延时)

    作者:以罗伊
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    JAVA反射机制
    Android插件化
    MFC项目的建立
    [ACM]躲猫猫
    [ACM]某一天的n天后是几年几月几日
    [ACM]括号配对问题
    开发中好用的网站
    TCP与UDP(实时通讯)
    NSSet基本使用
    NSPredicate(正则表达式)
  • 原文地址:https://www.cnblogs.com/my-ordinary/p/12592347.html
Copyright © 2011-2022 走看看