zoukankan      html  css  js  c++  java
  • SpringCloudStream实例

    1.解决的痛点

    由于市面上有很多的消息中间件(activeMQ,rabbitMQ,RocketMQ,Kafka),当某一天公司切换某一种新的消息中间件的时候,使得我们工作量会变大,增加学习量。

    那么有没有一种新技术,能让我们不在关注具体的MQ细节,我们只需要用一种适配绑定的方式,自动的给我们在MQ内进行切换。这个时候就是Springcloud Stream要大显身手的时候。

    2.概述

    一句话:屏蔽底层消息中间件的差异,降低切换成本,同一消息的编程模型。

    官方定义:springcloud Stream 是一个构建消息驱动的微服务框架。

    应用程序通过Inputs或者outputs来与springcloud Stream中binder对象交互。

    通过我们配置binding(绑定),而springcloud Stream 的binder对象负责与消息中间件交互。

    所以,我们只需要搞清楚如何与springcloud stream交互就可以方便的使用消息驱动方式。

    通过spring integration 来连接消息代理中间件,以实现消息消息事件驱动。

    springcloud stream为一些供应商的消息中间件产品提供了个性化自动配置实现,引用了发布-订阅、消费组、分区三个核心概念,目前仅支持RabbitMQ、 Kafka

    3.官网

    https://spring.io/projects/spring-cloud-stream

    4.stream凭什么可以统一差异?

    通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件细节之间的隔离。

    通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

    通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    5.Binder的两个重要概念

    input对应消费者

    output对应生产者

    6.Stream消息通信方式是什么?

    它遵循了发布-订阅模式,通过Topic主题进行广播推送。

    7.Stream遵循的标准流程

    1.binder 很方便的连接中间件,屏蔽差异。

    2.Channel 通道,是队列QUEUE的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel队列进行配置。

    3.source和sink 简单的可以理解为参照对象是springcloud stream自身,从stream发布消息就是输出,接受消息就是输入。

    8.API和常用的注解

    组成说明
    Middleware 中间件,目前只支持RabbitMQ和Kafka
    Binder binder是应用于消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
    @Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
    @Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
    @StreamListener 监听队列,用于消费者的队列的消息接收。
    @EnableBinding 指信道channel和exchange绑定在一起。

    9.代码

    第一步:创建eureka服务模块cloud-eureka-server7001、cloud-eureka-server7002

    1.POM文件:

     
        <dependencies>
            <!--eureka-server-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <!--引入热部署-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies> 

    2.创建application.yml

    server:
      port: 7001
    
    # eureka配置
    eureka:
      instance:
        hostname: eureka7001.com #eureka服务端的实例名称
      client:
        # false表示不想注册中心注册自己
        register-with-eureka: false
        # false表示自己端就是注册中心,我的职责就是维护服务实例,并不需要检索服务
        fetch-registry: false
        service-url:
          # 设置Eureka Server交互的地址查询服务和注册服务都需要依赖这个地址
          #defaultZone: http://eureka7002.com:7002/eureka/
          # 单机就是7001自己守望自己
          defaultZone: http://eureka7001.com:7001/eureka/
      server:
        enable-self-preservation: false # 禁用自我保护模式
        eviction-interval-timer-in-ms: 2000

    3.创建主入口函数

    package com.seegot.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-09 22:15
     */
    @SpringBootApplication
    @EnableEurekaServer // 表示自己就是注册中心
    public class EurekaMain7001 {
        public static void main(String[] args) {
            SpringApplication.run(EurekaMain7001.class,args);
        }
    }

    7002和上面配置基本一致,只是端口号不一致,直接clone就行。

    第二步:新建消息生产者模块cloud-stream-rabbitmq-provider8801

    1.POM

    <dependencies>
            <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
                <version>3.0.6.RELEASE</version>
            </dependency>
            <!--springcloud stream-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <!-- 客户端 config -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-config</artifactId>
            </dependency>
    
            <!--注入eureka client 依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency> 
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <!--引入热部署-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

    2.创建application.yml

     1 server:
     2   port: 8801
     3 spring:
     4   application:
     5     name: cloud-stream-provider
     6   cloud:
     7     stream:
     8       binders: # 在此处配置需要绑定的rabbitmq的服务消息
     9         defaultRabbit: # 表示定义的名称,用于binding整合
    10           type: rabbit # 消息组件类型
    11           environment: # 设置rabbitmq的相关环境配置
    12             spring:
    13               rabbitmq:
    14                 host: localhost
    15                 port: 5672
    16                 username: guest
    17                 password: guest
    18       bindings: # 服务的整合处理
    19         output: # 这个名字是一个通道的名称
    20           destination: studyExchange # 标识要使用的Exchange名称定义
    21           content-type: application/json #设置消息类型,本次为json,本文则设置为“text/plain”
    22           binder: defaultRabbit # 设置要绑定的消息服务的具体设置
    23 eureka:
    24   client:
    25     service-url:
    26       defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版
    27   instance:
    28     lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔,默认是30秒
    29     lease-expiration-duration-in-seconds: 5 # 如果超过了5秒间隔,默认是90秒
    30     instance-id: send-8801.com # 在信息列表时显示主机名称
    31     prefer-ip-address: true #访问路径变为IP地址

    3.创建主入口函数

    package com.seegot.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-30 10:45
     */
    @SpringBootApplication
    @EnableEurekaClient
    public class StreamMQMain8801 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8801.class,args);
        }
    }

    4.创建服务类 IMessageProvider

    package com.seegot.springcloud.service;
    
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-30 10:47
     */
    
    public interface IMessageProvider {
        public String send();
    }

    创建服务实现类

    package com.seegot.springcloud.service.impl;
    
    import com.seegot.springcloud.service.IMessageProvider;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.MessageChannel;
    
    import javax.annotation.Resource;
    import java.util.UUID;
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-30 10:48
     */
    @EnableBinding(Source.class) // 定义消息的推送管道。类似output
    public class MessageProviderImpl implements IMessageProvider {
        @Resource
        private MessageChannel output; // 属于消息发送管道。
    
        // 绑定消息
        @Override
        public String send() {
            String uuid = UUID.randomUUID().toString();// 发送的内容
            output.send(MessageBuilder.withPayload(uuid).build());
            System.out.println("@@@@@uuid: "+ uuid);
            return null;
        }
    }

    5.创建业务类

     1 package com.seegot.springcloud.controller;
     2 
     3 import com.seegot.springcloud.service.IMessageProvider;
     4 import lombok.extern.slf4j.Slf4j;
     5 import org.springframework.web.bind.annotation.GetMapping;
     6 import org.springframework.web.bind.annotation.RestController;
     7 
     8 import javax.annotation.Resource;
     9 
    10 /**
    11  * @program: cloud2020
    12  * @description:
    13  * @author: PP Zhang
    14  * @create: 2020-06-30 10:56
    15  */
    16 @RestController
    17 @Slf4j
    18 public class SendMessageController {
    19     @Resource
    20     private IMessageProvider messageProvider;
    21     @GetMapping(value = "/sendMessage")
    22     public String sendMessage(){
    23         return  messageProvider.send();
    24     }
    25 }

    6.测试。http://localhost:8801/sendMessage

    2020-06-30 13:22:54.601  INFO 2980 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration
    @@@@@uuid: baef8eab-67c7-40ce-b6ad-f290ad5ffeba
    @@@@@uuid: c457a864-45a4-4af1-99f8-ed0cf430429c
    @@@@@uuid: 803cfd67-ab44-4d63-98c0-b08880142316
    @@@@@uuid: 818e3d27-6853-4878-9d24-6c73f7349480
    @@@@@uuid: 45a28a16-d074-43ce-afd9-74c74d79259f
    @@@@@uuid: bcaad91e-fa08-4f5f-98d0-2232843f052b
    @@@@@uuid: 89b6b301-ee96-41e8-a99a-5f2c0e579cfa
    @@@@@uuid: 55d8c5aa-2093-44f5-ac8c-58392ec45249
    @@@@@uuid: 8d79cfe9-e0fa-4880-aad4-bc1342919ec7
    @@@@@uuid: 73202336-05b0-4682-bcfc-22627a3141c3
    @@@@@uuid: 40206a09-5e06-4f02-95bb-68f7c78e141e
    @@@@@uuid: c768c1d1-961c-4a3c-8250-cba4052b046f

    第三步:新建消费者模块cloud-stream-rabbitmq-consumer8802、cloud-stream-rabbitmq-consumer8803

    1.POM

     <dependencies>
            <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
                <version>3.0.6.RELEASE</version>
            </dependency>
            <!--springcloud stream-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            </dependency>
            <!-- 客户端 config -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-config</artifactId>
            </dependency>
    
            <!--注入eureka client 依赖-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency> 
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <!--引入热部署-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.12</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

    2.新建application.yml

    在分布式环境下,一条消息只希望被消费一次,避免重复消费,此时需要注意对group的使用,要将消费者统一在同一个组group内,也就是group名称一致。另外,group还可以实现消息的持久化。可以通过停用消息消费者,然后通过消息生产者发送多条消息,然后重新启动消息消费者,会看到消息消费者接收到了相应的消息,这就是持久化;反之如果删除group,那么重新停用消息消费者,然后在通过消息生产者发送消息,再次启动消息消费者,此时就接收不到相应的消息。

    server:
      port: 8802
    
    spring:
      application:
        name: cloud-stream-consumer
      cloud:
        stream:
          binders: # 在此处配置需要绑定的rabbitmq的服务消息
            defaultRabbit: # 表示定义的名称,用于binding整合
              type: rabbit # 消息组件类型
              environment: # 设置rabbitmq的相关环境配置
                spring:
                  rabbitmq:
                    host: localhost
                    port: 5672
                    username: guest
                    password: guest
          bindings: # 服务的整合处理
            input: # 这个名字是一个通道的名称
              destination: studyExchange # 标识要使用的Exchange名称定义
              content-type: application/json #设置消息类型,本次为json,本文则设置为“text/plain”
              binder: defaultRabbit # 设置要绑定的消息服务的具体设置
              group: seegotB #分组并且还可以实现消息持久化。
    eureka:
      client:
        service-url:
          defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka # 集群版
      instance:
        lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔,默认是30秒
        lease-expiration-duration-in-seconds: 5 # 如果超过了5秒间隔,默认是90秒
        instance-id: receive-8802.com # 在信息列表时显示主机名称
        prefer-ip-address: true #访问路径变为IP地址

    3.新建主入口函数

    package com.seegot.springcloud;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-30 11:19
     */
    @SpringBootApplication
    @EnableEurekaClient
    public class StreamMQMain8802 {
        public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class,args);
        }
    }

    4.新建业务类

    package com.seegot.springcloud.controller;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @program: cloud2020
     * @description:
     * @author: PP Zhang
     * @create: 2020-06-30 11:21
     */
    @RestController
    @EnableBinding(Sink.class)// 定义消息的接受管道。类似input
    public class ReceiveMessageListenerController {
        @Value("${server.port}")
        private String serverPort;
        @StreamListener(Sink.INPUT) // 监听
        public void input(Message<String> message){
            System.out.println("消费者1号,---->接收的消息:"+message.getPayload()+"	 port:"+serverPort);
        }
    }

     

     

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    Mybatis学习笔记(八) —— Mybatis整合spring
    Mybatis学习笔记(七) —— 关联查询
    Mybatis学习笔记(六) —— 动态sql
    Mybatis学习笔记(五) —— Mapper.xml(输入映射和输出映射)
    Mybatis学习笔记(四) —— SqlMapConfig.xml配置文件
    Mybatis学习笔记(三) —— DAO开发方法
    Mybatis学习笔记(二) —— mybatis入门程序
    Mybatis学习笔记(一) —— mybatis介绍
    tomcat的热部署配置
    int bool 字符串 列表 字典 集合
  • 原文地址:https://www.cnblogs.com/pengpengzhang/p/13214273.html
Copyright © 2011-2022 走看看