zoukankan      html  css  js  c++  java
  • Spring Cloud Stream 整合 RabbitMQ

    简介


    Spring Cloud Stream是一个构建消息驱动微服务的框架,应用程序通过input(相当于consumer)、output(相当于producer)来与Spring Cloud Stream中Binder交互,而Binder负责与消息中间件交互;因此,我们只需关注如何与Binder交互即可,而无需关注与具体消息中间件的交互。

    使用


    1、添加依赖

    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
      <version>2.1.2.RELEASE</version>
    </dependency>

    2、配置

    provider配置(采用动态路由键方式)

    server:
      port: 7071
    spring:
      cloud:
        stream:
          binders:
            pro:
              type: rabbit
              environment:
                spring:
                  rabbitmq:
                    addresses: localhost
                    port: 5672
                    username: test
                    password: test
                    virtual-host: test
          bindings:
            myOutPut:
              destination: myOutPut
              content-type: application/json
              default-binder: test
          rabbit:
            bindings:
              myOutPut:
                producer:
                  exchangeType: topic
                  routing-key-expression: headers.routeId

    consumer配置

    server:
      port: 7072
    spring:
      cloud:
        stream:
          rabbit:
            bindings:
              input:
                consumer:
                  bindingRoutingKey: routeKey1
                  acknowledge-mode: manual
          binders:
                  protest:
                    type: rabbit
                    environment:
                      spring:
                        rabbitmq:
                          addresses: localhost
                          port: 5672
                          username: test
                          password: test
                          virtual-host: test
          bindings:
                  input:
                    destination: myOutPut
                    content-type: application/json
                    default-binder: protest
                    group: group-cus1

    3、java端
    provider

    public interface MqMessageSource {//自定义通道
        String MY_OUT_PUT = "myOutPut";
        @Output(MY_OUT_PUT)
        MessageChannel testOutPut();
    }
    @EnableBinding(MqMessageSource.class)
    public class MessageProviderImpl implements IMessageProvider {
        @Autowired
        @Output(MqMessageSource.MY_OUT_PUT)
        private MessageChannel channel;
    
        @Override
        public void send(Company company) {
            channel.send(MessageBuilder.withPayload(company).setHeader("routeId", company.getTitle()).build());
        }
    }

    consumer

    @Component
    @EnableBinding(Sink.class)
    public class MessageListener {
        @StreamListener(Sink.INPUT)
        public void input(Message<Company> message) throws IOException {
            Channel channel = (com.rabbitmq.client.Channel)message.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            channel.basicAck(deliveryTag, false);
            System.err.println(JSON.toJSONString(message.getPayload()));
        }
    }

    END

  • 相关阅读:
    工具网页地址
    invalid comparison: java.util.ArrayList and java.lang.String——bug解决办法
    打印csdn博客文章内容
    git 更新远程分支列表
    idea 配置 SpringBoot 热启动详解,和热启动失效解决方案
    MacOS下安装RabbitMQ
    Spring Boot Admin
    Navicat Premium Mac 12 破解
    sql查询不重复数据
    org.apache.commons.lang3 jar的使用 ArrayUtils
  • 原文地址:https://www.cnblogs.com/ityard/p/11060999.html
Copyright © 2011-2022 走看看