zoukankan      html  css  js  c++  java
  • SpringCloudStream实战

    Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream构建于Spring Boot之上,用于创建独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。也就是说,Spring Cloud Stream是构建于Spring Boot和Spring Integration之上的框架,帮助创建事件驱动或消息驱动的微服务。

    主要模型如图:

    这里我们使用Kafka作为消息底层设施,原因见: 为什么我们从RabbitMQ切换到apache kafka?

    引入Kafka的Stream启动器:

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      <version>2.0.1.RELEASE</version>
    </dependency>


    微服务架构遵循“ 智能端点和哑管 ”原则,端点之间的通信由RabbitMQ或Apache Kafka等消息传递中间件方驱动,服务通过这些端点或通道发布领域事件进行通信。

    首先我们定义一个接口,定义输入和输出队列管道:

    public interface GreetingsStreams {
      String INPUT = "greetings-in";
      String OUTPUT = "greetings-out";
      @Input(INPUT)   SubscribableChannel inboundGreetings();
      @Output(OUTPUT)   MessageChannel outboundGreetings(); }


    @Input注释用来表示输入的消息队列,通过该通道接收消息并输入当前应用;@Output注释表示一个输出通道,通过它发布消息出去。@Input和@Output注解可以采取指定的通道名称(比如这里greetings-in greetings-out")作为参数,如果未提供名称,则使用注释的方法名称。

    在application.yaml或property中具体配置该消息通道到Kafka:

    spring:
      cloud:
        stream:
          kafka:
            binder:
              brokers: localhost:9092
            bindings:
              greetings-in:
                destination: greetings
                contentType: application/json
              greetings-out:
                destination: greetings
                contentType: application/json


    其中greetings-in和greetings-out配置到Kafka具体的主题topic名称为greetings,序列化类型是json,kafka默认端口在本地9092。

    好了,底层基础设施准备完成,现在需要将这个设施安装到我们的应用中。

    @EnableBinding(GreetingsStreams.class)
    public class StreamsConfig {
    }


    @EnableBinding将应用配置绑定接口GreetingsStreams中定义的通道INPUT和OUTPUT。

    现在我们的应用和消息基础设施已经绑定了,可使用@StreamListener到具体方法以接收具体的流处理事件了。

    @Component
    @Slf4j
    public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {   log.info("Received greetings: {}", greetings); } }


    StreamListeners 是消息监听者处理方法,接收类型的传入消息Greetings,可以看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型Person。

    上面方法是一个没有返回结果的void方法,如果有返回结果,必须使用@SendTo注释指定方法返回的数据的输出绑定队列目标output,如以下示例所示;通过

    @Component
    @Slf4j
    public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT) @SendTo(GreetingsStreams.OUTPUT)
    public String handleGreetings(Greetings greetings) {   log.info("Received greetings: {}", greetings);   return "Received greetings: {}" + greetings; } }


    Spring cloud stream实现了一个默认的Processor类,类似我们的GreetingsStreams接口,也就是说,可以不用自己做这个接口

    public interface Processor extends Source, Sink {
    }
    public interface Source {
      String OUTPUT = "output";
    
      @Output("output")
      MessageChannel output();
    }
    public interface Sink {
      String INPUT = "input";
    
      @Input("input")
      SubscribableChannel input();
    }

    如果使用默认的Processor通道名称,注意配置文件里也要配置成相应的通道名。

    测试运行
    有了接收方,下面我们实现一个发送方,我们通过调用rest接口发送消息,先看看发送方代码:

    @Service
    @Slf4j
    public class GreetingsService {
    private final GreetingsStreams greetingsStreams;
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
      this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greetings greetings) {
      log.info("Sending greetings {}", greetings);
      MessageChannel messageChannel = greetingsStreams.outboundGreetings();
      messageChannel.send(MessageBuilder
      .withPayload(greetings)
      .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
      .build());
    }
    }

    我们暴露一个端口来调用这个发送方:

    @RestController
    public class GreetingsController {
    private final GreetingsService greetingsService; public GreetingsController(GreetingsService greetingsService) {   this.greetingsService = greetingsService; }
    @GetMapping(
    "/greetings") @ResponseStatus(HttpStatus.ACCEPTED) public void greetings(@RequestParam("message") String message) {   Greetings greetings = Greetings.builder()   .message(message)   .timestamp(System.currentTimeMillis())   .build();   greetingsService.sendGreeting(greetings); } }

    也就是说:这个发送方REST和发送服务 与我们的GreetingsListener是通过消息系统通讯的,不是直接在发送服务里调用GreetingsListener的方法,这样这两者之间就解耦了。

    下面我们用postman调用:

    http://localhost:8080/greetings?message=hello

    控制台结果输出:

    c.e.c.GreetingsService : Sending greetings Greetings(timestamp=1535614400754, message=hello)
    
    c.e.c.GreetingsListener : Received greetings: Greetings(timestamp=1535614400754, message=hello)

    一个发送和一个接受完成了一个请求调用,如果GreetingsListener还有返回结果,是放在greetings-out之中的,那么GreetingsListener就变成发送方了,我们也可以参考这套做法再做个监听器。
    ---------------------

    原文:https://blog.csdn.net/qq_42950313/article/details/82224660 

  • 相关阅读:
    webStorm Ctrl+s 自动格式化 然后 保存 用宏命令
    vuez init webStorm vscode
    Node + Express + MySQL 接口开发完整案例
    sysUpload.vue上传组件 的时候 看进度的时候 不要mock 注释掉 // if (process.env.NODE_ENV !== 'production') require('@/mock')
    目录下 shift 右键菜单 打开cmd 或者在 地址栏输入cmd 回车进入cmd
    xls表格 拼接字段 拼json =CONCAT("{ code:'",A2,"',","codeName: '",B2,"',","flag: '",C2,"'},")
    js 数组过滤 filter
    libs/tools.js stringToDate dateToString 日期字符串转换函数
    vue 组件内 directives指令的调用方式 <base-table v-auto-height:tableHeight="{vm:this, diffHeight:ahTable.diffHeight}"
    油猴 tamperMonkey 在百度首页 添加自己的自定义链接
  • 原文地址:https://www.cnblogs.com/itplay/p/10409759.html
Copyright © 2011-2022 走看看