zoukankan      html  css  js  c++  java
  • SpringCloud Stream生产者配置RabbitMq的动态路由键

    在写这个文章前不得不吐槽目前国内一些blog的文章,尽是些复制粘贴的文章,提到点上但没任何的深入和例子。.........

    经过测试下来总结一下RabbitMQ的Exchange的特性:

    1、direct

    生产者可以指定路由键,消费者可以指定路由键,但不能讲路由键设置为#(全部)。

    2、topic

    生产者可以指定路由键,消费者可以指定路由键,也可以不指定(或者是#)。

    3、fanout

    生产者和消费都忽略路由键。

    在现实的场景里,通常是生产者会生产多个路由键的消费,然后多个消费来消费指定路由键的消息,但通常生产者的生产代码是同一份,如何在发消息的时候动态指定当前消息的路由键呢?

    例子:门店平台系统集中处理多个门店的数据,然后分别将不同门店的数据发送到不同的门店(即:A门店只消费属于A门店的消息,B门店只消费属于B的消息)

    看例子:

    application.yml

     1 spring:    
     2     cloud:
     3         stream:
     4             # 设置默认的binder
     5             default-binder: pos
     6             binders:
     7                 scm:
     8                     type: rabbit
     9                     environment:
    10                         spring:
    11                           rabbitmq:
    12                             # 连接到scm的host和exchange
    13                             virtual-host: scm
    14             pos:
    15                 type: rabbit
    16                 environment:
    17                     spring:
    18                         rabbitmq:
    19                             # 连接到pos的host和exchange
    20                             virtual-host: pos
    21                     
    22             shop:
    23                 type: rabbit
    24                   environment:
    25                     spring:
    26                       rabbitmq:
    27                         # 连接到shop的host和exchange
    28                         virtual-host: shop
    29             
    30           bindings:
    31             # ---------消息消费------------
    32 
    33             # 集单开始生产消费
    34             order_set_start_produce_input:
    35               binder: pos
    36               destination: POS_ORDER_SET_STRAT_PRODUCE
    37               group: pos_group
    38               
    39             # 门店ID为1的消费者    
    40             shop_consumer_input_1:
    41                 binder:    shop
    42                 destination: POS_ORDER_SET_STRAT_PRODUCE
    43                 group: shop_1_group
    44      
    45               
    46             #-----------消息生产-----------
    47             # 集单开始生产通知生产
    48             order_set_start_produce_output:
    49               binder: pos
    50               destination: POS_ORDER_SET_STRAT_PRODUCE
    51             
    52           rabbit:
    53             bindings:
    54               # 集单开始生产消费者
    55               order_set_start_produce_input:
    56                 consumer:
    57                   exchangeType: topic
    58                   autoBindDlq: true
    59                   republishToDlq: true
    60                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_POS_DLX
    61                   #bindingRoutingKey: '#'
    62               # 门店1的消费者
    63               shop_consumer_input_1:
    64                 consumer:
    65                   exchangeType: topic
    66                   autoBindDlq: true
    67                   republishToDlq: true
    68                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_SHOP_1_DLX
    69                   bindingRoutingKey: 1
    70                   deadLetterRoutingKey: 1
    71 
    72               # 生产者配置              
    73               order_set_start_produce_output:
    74                 producer:
    75                   exchangeType: topic
    76                   routingKeyExpression: headers.shopId
    77                   # routingKeyExpression: headers['shopId']

    上面的配置文件配置了一个动态的基于shopId做路由的生产者配置,一个消费全部路由键的消费者,如果要配置指定路由键的可以在配置文件里设置bindingRoutingKey属性的值。

    生产者java代码:

    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.support.MessageBuilder;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.longge.pos.production.mq.dto.OrderSetProductionMsg;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class MqSendUtil {
        private static MessageChannel orderSetStartProduceChannel;
    
        public static void setSfOrderCreateChannel(MessageChannel channel) {
            sfOrderCreateProduceChannel = channel;
        }
        
        public static void sendOrderSetPrintMsg(OrderSetProductionMsg msg) {
            // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
            // and add the value as routing key
            log.info("发送开始生产的MQ:{}", JSONObject.toJSONString(msg));
            orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).setHeader("shopId", msg.getOrderSet().getShopId()).build());
            //orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).build());
        }
    }

    动态路由的核心在于上面那个红色的字体的地方,这个是和配置文件里的  routingKeyExpression 的配置是匹配的。

  • 相关阅读:
    第六日会议博客
    第五日会议博客
    第四日会议博客
    第三日会议博客
    第二日会议博客
    第一日会议博客
    Alpha总结展望——前事不忘后事之师
    请坐,我们是专业团队——Alpha冲刺
    微信小程序测试总结
    WeChair项目Beta冲刺(4/10)
  • 原文地址:https://www.cnblogs.com/yangzhilong/p/9888398.html
Copyright © 2011-2022 走看看