zoukankan      html  css  js  c++  java
  • Spring Could Stream 基本用法

    Spring Cloud Stream是一个建立在Spring Boot和Spring Integration之上的框架,有助于创建事件驱动或消息驱动的微服务。
    通过它可以更方便的访问消息服务,如消费Rabbitmq的消息示例如下:
    添加Spring Cloud Stream与RabbitMQ消息中间件的依赖。
    <dependency>
    <groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
    配置通道关联的destination,对应rabbitmq的exchange名称。
    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: mqTestDefault 
            output:
              destination: mqTestDefault
              contentType: text/plain
    destination:指定了消息获取的目的地 exchange,这里的exchange就是 mqTestDefault。这里配置应用输入、输出的destination相同,实际应用是input或output中的一方。
    @SpringBootApplication@EnableBinding(Processor.class) public class MyLoggerServiceApplication {public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }
    启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。
    实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。

    只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX

    如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置:

    spring:
      cloud:
        stream:
          bindings:
            inputProductAdd:
              destination: mqTestProduct
              group: addProductHandler      # 拥有 group 默认会持久化队列
            outputProductAdd:
              destination: mqTestProduct
          rabbit:
            bindings:
              inputProductAdd:
                consumer:
                  bindingRoutingKey: addProduct.*       # 用来绑定消费者的 routing key
              outputProductAdd:
                producer:
                  routing-key-expression: '''addProduct.*'''  # 需要用这个来指定 RoutingKey

    常用配置

    给消费者设置消费组和主题

    1. 设置消费组: spring.cloud.stream.bindings.<通道名>.group=<消费组名>
    2. 设置主题: spring.cloud.stream.bindings.<通道名>.destination=<主题名>

    给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>

    消费者开启分区,指定实例数量与实例索引

    1. 开启消费分区: spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
    2. 消费实例数量: spring.cloud.stream.instanceCount=1 (具体指定)
    3. 实例索引: spring.cloud.stream.instanceIndex=1 #设置当前实例的索引值

    生产者指定分区键

    1. 分区键: spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
    2. 分区数量: spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
    一般最简单的应用只要配置spring.cloud.stream.bindings.开头的项即可,如果涉及到
  • 相关阅读:
    使用日历控件的一些体会(downmoon)
    带附加条件的NewID()用法
    微软的招聘哲学——做微软人的五大核心素质(摘自《微软360度》)
    彻底禁用fckEditor的上传功能(含防止Type漏洞问题)
    Remote Access Connection Manager 服务因下列错误而停止解决办法
    GridView如何更新批量数据和单条记录?
    .net1.1与.net2.0在加载ascx文件的控件时有所不同(Downmoon)
    牛羊吃草问题求解(downmoon)
    c#操作ecxel的一些资源(downmoon搜集)
    安装sql2008 enterprise (English正式版)图解
  • 原文地址:https://www.cnblogs.com/doit8791/p/11392598.html
Copyright © 2011-2022 走看看