zoukankan      html  css  js  c++  java
  • Spring Cloud:Stream基础知识

    背景

      消息中间件有多种,rabbitmq,rocketmq,activemq,kafka等。
      不同的消息中间件具体细节不一样。那么有没有一种新的技术诞生,让我们不再关注具体MQ细节,我们只需要用一种适配绑定的方式,自动给我们在各种MQ内切换。
      一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。Spring Cloud Stream 因此诞生。
      官方定义 Spring Cloud Stream是一个构建消息驱动微服务的框架。
      应用程序使用inputs或者outputs来与springcloud stream中binder交互。
      通过我们配置来bingding(绑定),而stream的binder对象负责与消息中间件交互。
      所以,我们只需要搞清楚如何与springcloud stream交互就可以方便使用消息驱动方式。
      SpringCloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用发布-订阅、消费组、分区三个核心概念。
      目前仅支持RabbitMQ和Kafka。
    Stream处理架构:

    通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件之间的隔离。
    stream标准流程套路:

    Binder:很方便的连接中间件,屏蔽差异。
    channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置。
    source和sink:简单理解为消息的输入输出。
    编码API和注解:

    Stream消息生产者

    pom依赖

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
    

    yml配置

    server:
      port: 8801
    
    spring:
      application:
        name: cloud-stream-provider
      cloud:
        stream:
          bindings:
            output:
              destination: studyExchange
              content-type: application/json
              binder: defaultRabbit
          binders:
            defaultRabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.10.132
                    port: 5672
                    username: guest
                    password: guest
    
    eureka:
      client:
        #是否将自己注册到Eureka Server 默认为true
        register-with-eureka: true
        #是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
        fetch-registry: true
        service-url:
          #设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2
        lease-expiration-duration-in-seconds: 5
        instance-id: send-8801.com
        prefer-ip-address: true
    

    发送消息的接口:

    public interface IMessageProvider {
        public String send();
    }
    

    发送消息的实现:

    @EnableBinding(Source.class)//定义消息的推送管道
    public class MessageProvider implements IMessageProvider {
    
        @Autowired
        private MessageChannel output;
    
        @Override
        public String send() {
            String serial = IdUtil.simpleUUID();
            System.out.println(serial+"============");
            output.send(MessageBuilder.withPayload(serial).build());
            return serial;
        }
    }
    

    controller

    @RestController
    public class SendController {
    
        @Autowired
        private IMessageProvider messageProvider;
    
        @GetMapping("sendMessage")
        public String sendMessage(){
            return messageProvider.send();
        }
    }
    

    调用接口,观察效果

    Stream消息消费者

    配置yml

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          bindings:
            input:
              destination: studyExchange
              content-type: application/json
              binder: defaultRabbit
          binders:
            defaultRabbit:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    host: 192.168.10.132
                    port: 5672
                    username: guest
                    password: guest
    
    eureka:
      client:
        #是否将自己注册到Eureka Server 默认为true
        register-with-eureka: true
        #是否从EurekaServer抓取已有的注册信息,默认为true,单节点无所谓,集群必须设置true才能配合ribbon做负载均衡
        fetch-registry: true
        service-url:
          #设置eureka server交互的地址查询服务和注册服务都需要依赖这个地址
          defaultZone: http://localhost:7001/eureka
      instance:
        lease-renewal-interval-in-seconds: 2
        lease-expiration-duration-in-seconds: 5
        instance-id: receive-8802.com
        prefer-ip-address: true
    

    接收消息:

    @EnableBinding(Sink.class)
    @RestController
    public class ReceiveMessageListenerController {
    
        @Value("${server.port}")
        private String serverPort;
    
        @StreamListener(value = Sink.INPUT)
        public void input(Message<String> message){
            System.out.println("消息:"+message.getPayload()+"serverPort:"+serverPort);
        }
    
    }
    

    stream重复消费

    下默认配置中,多个消费者存在,会存在重复消费问题
    原因:默认分组group是不同的,组流水号不一样,被认为是不同组,可以消费,所以要自定义配置分组。

    yml配置:

      cloud:
        stream:
          bindings:
            input:
              destination: studyExchange
              content-type: application/json
              binder: defaultRabbit
              group: wen.jie
    

    通过配置后,两个消费者被分配到一组,就不存在重复消费的问题。

  • 相关阅读:
    跨DLL边界传递CRT对象的隐患(或诸如:HEAP[]: Invalid Address specified to RtlValidateHeap(#,#)问题出现的原因)
    【策略模式】不同的时间用不同的规则优先考虑策略模式
    【装饰模式】遵循开闭原则,使用一个类装饰另一个类
    【简单的工厂模式】一个简单的计算器
    【原】使用Golang语言编写echo程序
    WebBrowser或CHtmlView中轻松屏蔽脚本错误(JavaScript)
    春运买票难,是谁造成的?
    搜狗输入法使用感受
    [原]在 go/golang语言中使用 google Protocol Buffer
    防护针对SQL Server数据库的SQL注入攻击
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/13628099.html
Copyright © 2011-2022 走看看