zoukankan      html  css  js  c++  java
  • Spring Cloud Stream 整合 RabbitMQ

    简介


    Spring Cloud Stream是一个构建消息驱动微服务的框架,应用程序通过input(相当于consumer)、output(相当于producer)来与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互;因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互。

    使用


    1、添加依赖

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      <version>2.1.2.RELEASE</version>
    </dependency>

    2、配置

    provider配置(采用动态路由键方式)

    server:
      port: 7071
    spring:
      cloud:
        stream:
          binders:
            pro:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: localhost
                    port: 5672
                    username: test
                    password: test
                    virtual-host: test
          bindings:
            myOutPut:
              destination: myOutPut
              content-type: application/json
              default-binder: test
          rabbit:
            bindings:
              myOutPut:
                producer:
                  exchangeType: topic
                  routing-key-expression: headers.routeId

    consumer配置

    server:
      port: 7072
    spring:
      cloud:
        stream:
          rabbit:
            bindings:
              input:
                consumer:
                  bindingRoutingKey: routeKey1
                  acknowledge-mode: manual
          binders:
                  protest:
                    type: rabbit
                    environment:
                      spring:
                        rabbitmq:
                          addresses: localhost
                          port: 5672
                          username: test
                          password: test
                          virtual-host: test
          bindings:
                  input:
                    destination: myOutPut
                    content-type: application/json
                    default-binder: protest
                    group: group-cus1

    3、java端
    provider

    public interface MqMessageSource {//自定义通道
        String MY_OUT_PUT = "myOutPut";
        @Output(MY_OUT_PUT)
        MessageChannel testOutPut();
    }
    @EnableBinding(MqMessageSource.class)
    public class MessageProviderImpl implements IMessageProvider {
        @Autowired
        @Output(MqMessageSource.MY_OUT_PUT)
        private MessageChannel channel;
    
        @Override
        public void send(Company company) {
            channel.send(MessageBuilder.withPayload(company).setHeader("routeId", company.getTitle()).build());
        }
    }

    consumer

    @Component
    @EnableBinding(Sink.class)
    public class MessageListener {
        @StreamListener(Sink.INPUT)
        public void input(Message<Company> message) throws IOException {
            Channel channel = (com.rabbitmq.client.Channel)message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
            System.err.println(JSON.toJSONString(message.getPayload()));
        }
    }

    END

  • 相关阅读:
    TortoiseGit 连接 git服务器免输入用户名和密码的方法
    mongodb 对参数类型的严格区分
    google API 使用Client Login 登录授权
    GAPI is the Google Analytics PHP5 Interface
    pr导出mp4格式提示无法播放解决方案
    PR如何导出mp4格式的视频
    pr 如何给视频进行加速,慢速处理
    如何用premiere添加配乐?pr视频添加音乐
    操作系统-银行家算法
    操作系统 内存分配算法
  • 原文地址:https://www.cnblogs.com/ityard/p/11060999.html
Copyright © 2011-2022 走看看