zoukankan      html  css  js  c++  java
  • Spring Boot 入门(九)使用RabbitMQ

    maven

    <!--rabbitmq-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    配置文件

    spring.rabbitmq.host=192.168.233.128
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=root
    spring.rabbitmq.password=123456

    创建生产者

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    /**
    * sendRabbitMQ
    * 默认Exchange: (AMQP default)
    * direct模式:通过routingKey关联
    */
    @RequestMapping("/sendRabbitMQ")
    public String sendRabbitMQ(String msg) {
        rabbitTemplate.convertAndSend("queue.A", msg);
        return "发送成功";
    }

    创建消费者

    package com.example.demo.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    @Component
    public class QueueListener {
        @RabbitListener(queuesToDeclare = @Queue("queue.A"))
        public void receiveMsgA(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息A:" + msg);
        }
    }

    发送消息

    调用接口“sendRabbitMQ”

    控制台打印

     RabbitMQ管理端查看队列信息

    fanout模式

    将多个Queue(队列)绑定到一个Exchange(交换机),当Exchange收到消息时,与之绑定的Queue都将收到消息。

    //发送代码指定一个Exchange
    rabbitTemplate.convertAndSend("fanoutExchange", "", msg);
    package com.example.demo.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    @Component
    public class QueueListener {
        @RabbitListener(bindings = @QueueBinding(value = @Queue("queue.A"), exchange = @Exchange("fanoutExchange")))
        public void receiveMsgA(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息A:" + msg);
        }
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue("queue.B"), exchange = @Exchange("fanoutExchange")))
        public void receiveMsgB(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息B:" + msg);
        }
    }

    topic模式(用得比较多)

    类似路由模式,但是routing_key支持模糊匹配,按规则转发消息(最灵活)。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。

    //发送代码指定一个Exchange,并且routingKey=A
    rabbitTemplate.convertAndSend("topicExchange", "A", msg);
    package com.example.demo.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    @Component
    public class QueueListener {
        @RabbitListener(bindings = @QueueBinding(value = @Queue("queue.A"), key = {"A"}, exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC)))
        public void receiveMsgA(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息A:" + msg);
        }
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue("queue.B"), key = {"B"}, exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC)))
        public void receiveMsgB(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息B:" + msg);
        }
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue("queue.A"), key = {"A"}, exchange = @Exchange(value = "topicExchange", type = ExchangeTypes.TOPIC)))
        public void receiveMsgC(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            System.out.println("收到消息C:" + msg);
        }
    }

    由于消费者是轮询消费,所以发送两次消息,A和C依次收到消息(routingKey=A,B消费者不会收到消息)

     

  • 相关阅读:
    bzoj3306: 树(dfs序+倍增+线段树)
    bzoj1969: [Ahoi2005]LANE 航线规划(树链剖分)
    Codeforces 578B. "Or" Game(思维题)
    bzoj3251: 树上三角形(思维题)
    bzoj2006: [NOI2010]超级钢琴(堆+RMQ)
    bzoj4165: 矩阵(堆+hash)
    bzoj3007: 拯救小云公主(二分+并查集)
    Codeforces 582C. Superior Periodic Subarrays(数学+计数)
    Codeforces 585E. Present for Vitalik the Philatelist(容斥)
    Codeforces 585D. Lizard Era: Beginning(meet in the middle)
  • 原文地址:https://www.cnblogs.com/xiaoxiaoyu0707/p/15684444.html
Copyright © 2011-2022 走看看