zoukankan      html  css  js  c++  java
  • Springboot集成RabbitMQ

    1、

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2、根据实际情况配置情况

    #IP地址
    spring.rabbitmq.host=localhost
    #rabbitmq默认端口号
    spring.rabbitmq.port=5672
    #账户名和密码
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest

    3、配置交换机(Exchang)与队列(Queue)绑定关系(RouteKey)

    @Configuration
    public class RabbitConfig {
        /**topic模式 名字可自由定义*/
        public static final String TOPIC_QUEUE1 = "topic.queue1";
        public static final String TOPIC_QUEUE2 = "topic.queue2";
        public static final String TOPIC_EXCHANGE = "topic.exchange";
        /**fanout模式 名字可自由定义*/
        public static final String FANOUT_QUEUE1 = "fanout.queue1";
        public static final String FANOUT_QUEUE2 = "fanout.queue2";
        public static final String FANOUT_EXCHANGE = "fanout.exchange";
        /**redirect模式 名字可自由定义*/
        public static final String DIRECT_QUEUE1 = "direct.queue1";
        public static final String DIRECT_QUEUE2 ="direct.queue2" ;
        public static final String DIRECT_EXCHANGE = "direct.exchange";
    
        /**
         * Topic模式  非常灵活的,极力推荐
         * 可以自定义RouteKey
         * 符号“#” 匹配一个或多个词
         * 符号“*” 匹配不多不少一个词
         * 容易出现当使用的队列key符合绑定的队列key时,
         * 同一消息会出现在不同的队列中,但一般使用这种模式的不会使用固定的RouteKey
         */
         /**创建TOPIC队列1*/
        @Bean
        public Queue topicQueue1() {
            return new Queue(TOPIC_QUEUE1);
        }
         /**创建TOPIC队列1*/
        @Bean
        public Queue topicQueue2() {
            return new Queue(TOPIC_QUEUE2);
        }
        /**创建TOPIC交换机*/
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE);
        }
        /**绑定交换机与队列1*/
        @Bean
        public Binding topicBinding1() {
            return BindingBuilder.bind(topicQueue1()).
                to(topicExchange()).with("topic.message");
        }
        /**绑定交换机与队列2*/
        @Bean
        public Binding topicBinding2() {
            return BindingBuilder.bind(topicQueue2()).
            to(topicExchange()).with("topic.#");
        }
    
        /**
         * Fanout模式
         * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,
         * 绑定了这个交换机的所有队列都收到这个消息,
         * 会发生同一个消息出现在不同的队列里
         * @return
         */
        @Bean
        public Queue fanoutQueue1() {
            return new Queue(FANOUT_QUEUE1);
        }
        @Bean
        public Queue fanoutQueue2() {
            return new Queue(FANOUT_QUEUE2);
        }
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(FANOUT_EXCHANGE);
        }
        @Bean
        public Binding fanoutBinding1() {
            return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
        }
        @Bean
        public Binding fanoutBinding2() {
            return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
        }
    
        /**
         * direct模式
         * 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 
         * 交换器就将消息发到对应的队列中。路由键与队列名完全匹配  
         * 根据定义的绑定队列KEY使用对应的队列
         * @return
         */
        @Bean
        public Queue directQueue1() {
            return new Queue(DIRECT_QUEUE1);
        }
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE);
        }
        @Bean
        public Binding directBinding1() {
            return BindingBuilder.bind(directQueue1()).
                to(directExchange()).with("jw");
        }

    4、Fanout模式

     

    发送者
    @Component
    public class FanoutSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(MqUserDto user) {
            this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", user);
        }
    }
    接收者
    @Component
    public class FanoutReceiver {
        /**queues是指要监听的队列的名字*/
        @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1)
        public void receiveFanout1(MqUserDto user) {
            System.out.println("【receiveFanout1监听到消息】" + user.getId()+" : "+user.getName());
        }
        @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2)
        public void receiveFanout2(MqUserDto user) {
            System.out.println("【receiveFanout2监听到消息】" + user.getId()+" : "+user.getName());
        }
    }

    5、Direct模式

    发送者
    @Component
    public class DirectSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
        public void send(MqUserDto user) {
            this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"jw", user);
        }
    }
    接收者
    @Component
    public class DirectReceiver {
        /**queues是指要监听的队列的名字*/
        @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1)
        public void receiveDirect1(MqUserDto user) {
            System.out.println("【receiveDirect1监听到消息】" + user.getId() + " : "+user.getName());
        }
    
        @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE2)
        public void receiveDirect2(MqUserDto user) {
            System.out.println("【receiveDirect2监听到消息】" + user.getId() + " : "+user.getName());
        }
    }

    6、Topic模式

    发送者
    @Component
    public class TopicSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        // 第一个参数:TopicExchange名字
        // 第二个参数:Route-Key
        // 第三个参数:要发送的内容
        public void send(MqUserDto user) {
            this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"jw.message", user);
        }
    }
    接收者
    @Component
    public class TopicReceiver {
        // queues是指要监听的队列的名字
        @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1)
        public void receiveTopic1(MqUserDto user) {
            System.out.println("【receiveTopic1监听到消息】" + user.getId() + " : "+user.getName());
        }
    
        @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2)
        public void receiveTopic2(MqUserDto user) {
            System.out.println("【receiveTopic2监听到消息】" + user.getId() + " : "+user.getName());
        }
    }

    7、DTO

    @Getter
    @Setter
    @ToString
    public class MqUserDto implements Serializable {
        private String id;
        private String name;
    }
  • 相关阅读:
    【转载】[030]◀▶ ArcEngine 一些实现代码
    C#获取FTP目录下文件夹、文件的方法,进而判断FTP下指定文件夹是否存在
    C# 命名空间"DevExpress.XtraReports.UserDesigner"中不存在类型或命名空间名称"XRTabbedMdiManager"。是否缺少程序集引用?
    C#欢迎界面的两种形式
    RibbonForm中Text显示不完整
    IDL波段分解与合成源代码
    Java多线程--synchronized(二)
    Java多线程--synchronized(一)
    Java多线程--基础(三)
    Java多线程--基础(二)
  • 原文地址:https://www.cnblogs.com/jwdd/p/10156448.html
Copyright © 2011-2022 走看看