zoukankan      html  css  js  c++  java
  • SpringCloud学习(七):stream 消息驱动

    菜鸟学渣接触spring cloud 系列...

    公司也上微服务了,再不学习下就凉了,所以来踩坑吧...

    版本:

      spring-boot:  2.0

      spring-cloud: Finchley.SR1

    已有项目:

      [eureka-server]              # 注册中心   port 8761

      [eureka-client-one]       #  微服务1    port 8501

      [eureka-client-two]       #  微服务2    port 8502

      [eureka-client-turbine] #  断路监控   port 8503

      [eureka-client-zuul]      #  网关服务   port 8601

      [eureka-client-sleuth]      #  链路追踪 port 8602

    能上图绝不BB

      

      spring-cloud-stream 支持RabbitMQ、Kafka 组件的消息系统,这里选RabbitMQ

      大致这样理解: 微服务ABCD(吃货)不断发(吃披萨的)消息到RabbitMQ(饿了吗),微服务F(卖披萨的)一直监听着RabbitMQ,收到ABCD的消息后,立马打包披萨送到ABCD家里,地址从消息里来的。

    零、 安装RabbitMQ-server端

      这里使用stream-rabbitmq基于rabbitMQ实现,需要先在电脑安装RabbitMQ-server

      Windows 下安装RabbitMQ:

        1. 安装erlang 语言环境

        2. 安装RabbitMQ-server

        3. 启用plugin

    一、stream 消息产生和接收

      这里就把产生和接收放到一个微服务里面了,不分开写了

      新建 [eureka-client-stream]  

      引入依赖   spring-cloud-stream、spring-cloud-starter-stream-rabbit

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.renzku</groupId>
        <artifactId>eureka-client-stream</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>eureka-client-stream</name>
        <description>Demo project for Spring Boot</description>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <spring-cloud.version>Finchley.SR1</spring-cloud.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' -->
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-test-support</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    View Code

      配置文件   application.yml

    server:
      port: 8603
    
    spring:
      application:
        name: eureka-client-stream
      cloud:
        stream:
          bindings:
            input:        # 接收
              destination:  eureka-client-stream-des    # exchange名称
            output:     # 产生
              destination:  eureka-client-stream-des    # input和output一致便可沟通
      rabbitmq:   # rabbitMQ-server 信息
        host: localhost 
        port: 5672
        username: guest
        password: guest
    
    eureka:
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

      启动类   EurekaClientStreamApplication.java

    @SpringBootApplication
    public class EurekaClientStreamApplication {
    
        public static void main(String[] args) {
    
            SpringApplication.run(EurekaClientStreamApplication.class, args);
        }
    }

      产生消息的类   HelloStreamSource.java

    @EnableBinding(Source.class)
    public class HelloStreamSource {
    
        @Resource
        private MessageChannel output;
    
        public void sendTestData() {
            String s = "source msg";
            this.output.send(MessageBuilder.withPayload(s).build());  // 发出消息
        }
    }

      接收消息的类   HelloStreamSink.java

    @EnableBinding(Sink.class)
    public class HelloStreamSink {
    
        @StreamListener(Sink.INPUT)
        public void input(String s){
    
            System.out.println("input:" + s);
        }
    }

      Rest服务  HelloStream.java

    @RestController
    public class HelloStream {
    
        @Autowired
        private HelloStreamSource helloStreamSource;
    
        @RequestMapping("/stream")
        public String HelloStream(){
            // 发出消息
            helloStreamSource.sendTestData();
            return "hello stream";
        }
    }

      目录结构

      

      启动 [eureka-client-stream]

      访问  http://localhost:8603/stream, rabbitMQ中Exchanges添加了 eureka-client-stream-des 

      

      控制台也打印了接收到的信息:

      

    二、自定义消息通道

      新建通道接口类  CustomProcessor.java

    /**
     * 自定义消息通道
     */
    public interface CustomProcessor {
        String INPUT = "customInput";
        String OUTPUT = "customOutput";
    
        @Input(CustomProcessor.INPUT)
        MessageChannel customInput();
    
        @Output(CustomProcessor.OUTPUT)
        MessageChannel customOutput();
    }

      配置文件  application.yml

    server:
      port: 8603
    
    spring:
      application:
        name: eureka-client-stream
      cloud:
        stream:
          bindings:
            input:        # 接收
              destination:  eureka-client-stream-des    # exchange名称
            output:     # 产生
              destination:  eureka-client-stream-des    # input和output一致便可沟通
            customInput:
              destination:  eureka-client-stream-cust-des
            customOutput:
              destination:  eureka-client-stream-cust-des  # 如果用上面output值,那上面input也会接收到这里发的消息
      rabbitmq:   # rabbitMQ-server 信息
        host: localhost 
        port: 5672
        username: guest
        password: guest
    
    eureka:
      client:
        serviceUrl:
          defaultZone: http://localhost:8761/eureka/

      新建消息产生类  HelloStreamCustomProcessorSource.java

    @EnableBinding(CustomProcessor.class)
    public class HelloStreamCustomProcessorSource {
    
        @Resource
        private MessageChannel customOutput;
    
        public void sendTestData() {
            String s = "custom source msg";
            this.customOutput.send(MessageBuilder.withPayload(s).build());
        }
    }

      新建消息接收类   HelloStreamCustomProcessorSink.java

    @EnableBinding(CustomProcessor.class)
    public class HelloStreamCustomProcessorSink {
    
        @StreamListener(CustomProcessor.INPUT)
        public void input(String s){
    
            System.out.println("custom input:" + s);
        }
    }

      Rest服务   HelloStream.java

    @RestController
    public class HelloStream {
    
        @Autowired
        private HelloStreamSource helloStreamSource;
    
        @Autowired
        private HelloStreamCustomProcessorSource helloStreamCustomProcessorSource;
    
        @RequestMapping("/stream")
        public String HelloStream(){
            helloStreamSource.sendTestData();
            return "hello stream";
        }
    
        @RequestMapping("/stream/cust")
        public String HelloStreamCust(){
            helloStreamCustomProcessorSource.sendTestData();
            return "hello stream cust";
        }
    }

      启动并访问  http://localhost:8603/stream/cust , rabbitMQ中Exchanges添加了 eureka-client-stream-cust-des

      

      控制台也打印了新通道接收到的信息:

      

    这个可以用来和spring cloud config 结合,更新微服务配置信息

    spring-cloud-bus.jar!orgspringframworkcloudusSpringCloudBusClient.java  了解一下

  • 相关阅读:
    静态与非静态(转改)
    关于odp.net的FetchSize属性
    SQL_SERVER 导oracle(转)
    win7电脑上wifi
    Oracle对象统计信息
    SQL_SERVER 连接oracle(转)
    linq in 语法
    关于引擎的设计
    温习设计模式
    技巧类
  • 原文地址:https://www.cnblogs.com/renzku/p/9615449.html
Copyright © 2011-2022 走看看