zoukankan      html  css  js  c++  java
  • SpringCloud学习笔记(九、SpringCloud Stream)

    目录:

    • 什么是SpringCloud Stream
    • 如何使用SpringCloud Stream
    • 消息分流

    什么是SpringCloud Stream:

    SpringCloud Stream是一个用于构建消息驱动的微服务应用框架。它通过注入,输入、输出通道来与外界通信;因此它很容易实现消息的中转,并且在更换消息中间件的时候不需要该代码,仅需要修改配置即可。支持的消息中间件如RabbitMQ、Kafka等等。

    如何使用SpringCloud Stream(以RabbitMQ为例):

    1、增加maven依赖

     1 <dependency>
     2     <groupId>org.springframework.cloud</groupId>
     3     <artifactId>spring-cloud-stream</artifactId>
     4 </dependency>
     5 <dependency>
     6     <groupId>org.springframework.cloud</groupId>
     7     <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
     8 </dependency>
     9 <dependency>
    10     <groupId>org.springframework.cloud</groupId>
    11     <artifactId>spring-cloud-stream-test-support</artifactId>
    12     <scope>test</scope>
    13 </dependency>

    2、增加properties配置

     1 spring.application.name=stream
     2 server.port=7070
     3 
     4 # rabbitmq
     5 spring.rabbitmq.host=localhost
     6 spring.rabbitmq.port=5672
     7 spring.rabbitmq.username=guest
     8 spring.rabbitmq.password=guest
     9 
    10 # stream
    11 spring.cloud.stream.bindings.input.destination=customer
    12 spring.cloud.stream.bindings.output.destination=customer

    3、启动类加上本工程的消息代理类型

    @EnableBinding({Processor.class})

    @EnableBinding分为三种类型

    )org.springframework.cloud.stream.messaging.Processor:接收和发送消息

    )org.springframework.cloud.stream.messaging.Source:仅支持发送消息

    )org.springframework.cloud.stream.messaging.Sink:仅支持接收消息

    4、加上Controller及Service

     1 @RestController
     2 @AllArgsConstructor
     3 public class ProcessorController {
     4 
     5     private final ProcessorService processorService;
     6 
     7     @GetMapping("/testProcessor/{message}")
     8     public boolean testProcessor(@PathVariable("message") String message) {
     9         return processorService.send(message);
    10     }
    11 }
     1 @Service
     2 @AllArgsConstructor
     3 public class ProcessorService {
     4 
     5     private final Processor processor;
     6 
     7     public boolean send(String message) {
     8         return processor.output().send(MessageBuilder.withPayload(message).build());
     9     }
    10 
    11     public boolean subscribe(MessageHandler handler) {
    12         return processor.input().subscribe(handler);
    13     }
    14 }

    5、在任意bean下写上接收逻辑或另起一个工程(另一个工程的mq需要配成一个哦)

    1 @StreamListener(Sink.INPUT)
    2 public void receive(String message) {
    3     System.err.println("receive message: " + message);
    4 }

    然后我们启动项目,访问http://localhost:7070/testProcessor/hello,此时就会在控制台看到receive message: hello的字样。

    消息分流(kafka特性):

    1 @GetMapping("/testMessageShunt/{type}")
    2 public boolean testMessageShunt(@PathVariable("type") String type) {
    3     String header = "a".equalsIgnoreCase(type) ? "msg1" : "msg2";
    4     return processorService.send(type, header);
    5 }
     1 /**
     2  * RabbitMQ不支持消息分流
     3  */
     4 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs1'")
     5 public void receiveMessage1(@Payload Message<String> message) {
     6     System.err.println("receive message1: " + message.getPayload());
     7 }
     8 
     9 /**
    10  * RabbitMQ不支持消息分流
    11  */
    12 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs2'")
    13 public void receiveMessage2(@Payload Message<String> message) {
    14     System.err.println("receive message2: " + message.getPayload());
    15 }
  • 相关阅读:
    状压DP【p1879】[USACO06NOV]玉米田Corn Fields
    Tarjan缩点+Spfa最长路【p3627】[APIO2009] 抢掠计划
    Tarjan缩点【p1726】上白泽慧音
    分层图【p4822】[BJWC2012]冻结
    Tarjan缩点+LCA【p2783】有机化学之神偶尔会做作弊
    线段树【p1607】[USACO09FEB]庙会班车Fair Shuttle
    better-scroll踩坑合集
    在浏览器上安装 Vue Devtools工具
    无法执行vue初始化命令
    vue-cli创建第一个项目(用git bash解决上下键移动选择问题)
  • 原文地址:https://www.cnblogs.com/bzfsdr/p/11716900.html
Copyright © 2011-2022 走看看