zoukankan      html  css  js  c++  java
  • RabbitMQ topic模式以及headers模式实践

    参考自: https://www.jianshu.com/p/e647758a7c50

    https://how2j.cn/k/message/message-rabbitmq-topic/2033.html?p=78908

    fanout模式: 与指定交换机绑定的所有队列都可以接收到消息

    direct模式: 将消息发送到由exchange和routingKey指定的队列中,如果多个消息队列有相同的routingKey,都可以接收到消息

    topic模式:  例如发送到交换机test.topic的消息routingKey有routing_topic.1、routing_topic.2, 则在绑定消息队列与路由时指定routing_topic.*就可以同时匹配这两个routingKey, 就可以接收发送到交换机test.topic, 路由为routing_topic.1、routing_topic.2的消息。

    headers模式: 根据消息的headers来匹配对应的队列,在消息接收回调中指定headers, 可以是Map<String, Object>、String可变数组类型的keys等

    添加springboot支持jar包

        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
          <version>2.0.4.RELEASE</version>
        </dependency>

    创建topic模式、headers模式、direct模式交换机,对应的消息就可以根据exchangeName、routingKey发送到指定队列

        private void createTopicExchange(RabbitAdmin rabbitAdmin) {
            // 创建交换机,类型为 topic
            // durable 参数表示是否持久化
            rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
    
            // 创建队列
            // durable 参数表示是否持久化
            rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
    
    
            /**
             * 链式写法
             */
            rabbitAdmin.declareBinding(
                    BindingBuilder.bind(new Queue("test.topic.queue", false)) // 直接创建队列
                            .to(new TopicExchange("test.topic", false, false)) // 直接创建交换机,并建立关联关系
                            .with("routing_topic.*") // 指定路由 key
            );
        }
    
        private void createHeadersExchange(RabbitAdmin rabbitAdmin) {
            rabbitAdmin.declareExchange(new HeadersExchange("test.headers", false, false));
    
            rabbitAdmin.declareQueue(new Queue("test.headers.queue", false));
    
            Map<String, Object> map = new HashMap<>();
            map.put("type", "headers");
    
            Binding binding = BindingBuilder.bind(new Queue("test.headers.queue", false))
                    .to(new HeadersExchange("test.headers", false, false))
                    .whereAll(map).match();
    
            rabbitAdmin.declareBinding(binding);
        }

    private void createDirectExchange(RabbitAdmin rabbitAdmin) {
    rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

    rabbitAdmin.declareQueue(new Queue("test.direct.queue1", false));
    rabbitAdmin.declareQueue(new Queue("test.direct.queue2", false));

    rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue1", false)
    ).to(new DirectExchange("test.direct", false, false))
    .with("test.direct.routing"));

    rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("test.direct.queue2", false)
    ).to(new DirectExchange("test.direct", false, false))
    .with("test.direct.routing"));
    }
     

    RabbitMQ配置:

    package com.test;

    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    import java.util.HashMap;
    import java.util.Map;

    @Configuration
    public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
    return rabbitTemplate;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    createTopicExchange(rabbitAdmin);
    createHeadersExchange(rabbitAdmin);
    createDirectExchange(rabbitAdmin);
    return rabbitAdmin;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory factory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(factory);
    container.setQueueNames("test.topic.queue", "test.headers.queue", "test.direct.queue1", "test.direct.queue2");
    container.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    System.out.println("接收到:" + new String(message.getBody()));
    }
    });

    return container;
    }
    }

    springboot启动时发送消息:

    package com.test;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import java.util.HashMap;
    
    @Component
    public class ExecuteRunnable implements CommandLineRunner {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void run(String... args) throws Exception {
            //topic模式
            rabbitTemplate.send("test.topic", "routing_topic.1", new Message("Message1".getBytes(), new MessageProperties()));
            rabbitTemplate.send("test.topic", "routing_topic.2", new Message("Message2".getBytes(), new MessageProperties()));
            //headers模式
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.getHeaders().put("type", "headers");
            rabbitTemplate.send("test.headers", "", new Message("Message3".getBytes(), messageProperties));
            //direct模式
            rabbitTemplate.send("test.direct", "test.direct.routing", new Message("Message4".getBytes(), new MessageProperties()));
        }
    }

    然后就可以在SimpleMessageListenerContainer  实例获取方法中接收到发送的消息

  • 相关阅读:
    密码等级
    ie兼容透明
    分割线
    支付宝银行判断接口
    date只能选择今天之后的时间js
    离开页面之前提示,关闭,刷新等
    使用 Linux 系统的常用命令
    C#窗体简单增删改查
    1
    二维数组
  • 原文地址:https://www.cnblogs.com/wushengwuxi/p/12886418.html
Copyright © 2011-2022 走看看