zoukankan      html  css  js  c++  java
  • RabbitMQ使用

    官网:https://www.rabbitmq.com/

    RabbitMQ is the most widely deployed open source message broker.

    RabbitMQ是最广泛部署开源的消息中间件。

     Spring-Boot项目引入依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>        

    application.yml配置MQ信息:

    #RabbitMQ
    spring.rabbitmq:
      host: localhost
      port: 5672
      #username: admin
      #password: 123456

    # 开启发送确认
    publisher-confirms: true
    # 开启发送失败退回
    publisher-returns: true
    # 开启ACK
    listener.direct.acknowledge-mode: manual
    listener.simple.acknowledge-mode: manual

    配置类RabbitConfig声明队列queue1:

    /**
     * RabbitMQ配置类
     */
    @Configuration
    public class RabbitConfig {
        public static final String QUEUE_FIRST = "queue1"; //队列名
        
        /**
         * 声明队列
         * @return
         */
        @Bean
        public Queue queueFirst() {
            return new Queue(QUEUE_FIRST);
        }
    }

     Work Queues(工作模式)

    生产者把消息直接发送到队列中,多个消费者绑定一个队列进行竞争消费。谁抢到谁执行.实用场景:秒杀业务 抢红包等

     生产者发送消息:

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitMQTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        /**
         * 测试发送消息到队列
         */
        @Test
        public void sendToQueue(){
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_FIRST, "你好,这是消息hello"+ i);
            }
        }
    }

    在另外一个项目里,2个消费者来接受消息:@RabbitListener注解,监听队列

    @Component
    public class Receiver {
        /**
         * 处理消息
         * @param content
         * @throws IOException 
         */
        @RabbitListener(queues = RabbitConfig.QUEUE_FIRST) //监听队列
        public void processMessage1(String content, Channel channel, Message message) throws IOException {
            try {
                System.out.println("消费者1收到消息:" + content);
                //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //抛弃此条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (Exception e) {
                e.printStackTrace();
                //重新放入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
        
        @RabbitListener(queues = RabbitConfig.QUEUE_FIRST)
        public void processMessage2(String content, Channel channel, Message message) throws IOException {
            System.out.println("消费者2收到消息:" + content);
            try {
                //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //抛弃此条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (Exception e) {
                e.printStackTrace();
                //重新放入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            
        }
    }

    运行结果:多个消费者监听同一个队列,消息会均匀地发送给消费者 

    Publish/Subscribe(发布订阅模式)

    生产者把消息发布到交换机,交换机将消息发给N个队列,消费者绑定响应队列取消息即可,此功能比较适合将某单一系统的简单业务数据消息广播给所有接口

    应用场景:邮件群发,群聊天,广告

     配置类RabbitConfig新增代码:声明队列queue2,fanout交换机,绑定交换机和队列:

        public static final String QUEUE_SECOND = "queue2"; //队列名
    
        @Bean
        public Queue queueSecond() {
            return new Queue(QUEUE_SECOND);
        }
    
      /**
         * 声明fanout交换机
         * @return
         */
        @Bean
        public Exchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
        
        /**
         * 队列绑定fanout交换机,不需要路由键(路由键会忽略)
         * @param queueFirst
         * @param fanoutExchange
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueFirst, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueFirst).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeMessage1(Queue queueSecond, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queueSecond).to(fanoutExchange);
        }    

    生产者发送消息:

        /**
         * 测试发送消息到faout交换机
         */
        @Test
        public void sendFaout(){
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("fanoutExchange","", "发布订阅模式发消息:" + i);
            }
        }

    把消费者2改为监听队列queue2,消费者1不用改,还是监听队列queue1

    @RabbitListener(queues = RabbitConfig.QUEUE_SECOND)
        public void processMessage2(String content, Channel channel, Message message) throws IOException {
            System.out.println("消费者2收到消息:" + content);
            try {
                //手动确认消息,消费者发送一个消息应答rabbitMQ才会删除消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
                //抛弃此条消息
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (Exception e) {
                e.printStackTrace();
                //重新放入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            
        }

    运行结果:绑定此交换机的队列都收到了一样的消息

    Routing(路由模式)

    如果路由键完全匹配的话,消息才会被投放到相应的队列.amq.direct是rabbitMQ默认的持久化的交换机.

    由于主题模式包含了路由模式,而且工作中基本用主题模式,就是交换机类型不一样,主题模式的路由规则更灵活。路由模式例子就不写了

    Topics(主题模式)

    模糊匹配,设置模糊的绑定方式,"*"操作符将"."视为分隔符,匹配单个单词;"#"操作符没有分块的概念,它将任意"."均视为关键字的匹配部分,能够匹配多个字符.

    配置类RabbitConfig新增代码:声明topic交换机,绑定交换机和队列,队列queue1路由规则为topic.*,queue2路由规则为topic.#:

      /**
         * 声明topic交换机
         * @return
         */
        @Bean
        public Exchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
        /**
         * 队列绑定topic交换机
         * @param queueFirst 队列Bean
         * @param topicExchange 交换机Bean
         * @return
         */
        @Bean
        Binding bindingExchangeMessage(Queue queueFirst, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueFirst).to(topicExchange).with("topic.*");
        }
        @Bean
        Binding bindingExchangeMessage1(Queue queueSecond, TopicExchange topicExchange) {
            return BindingBuilder.bind(queueSecond).to(topicExchange).with("topic.#");
        }

    和原来一样,消费者1监听队列queue1,消费者2监听queue2

    发送消息:

        /**
         * 测试发送消息到topic交换机
         */
        @Test
        public void sendTopic(){
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("topicExchange","topic.msg.消息", "主题消息:你好" + i);
            }
        }

    运行结果:发送消息的路由键为topuc.msg.消息,只能匹配上队列queue2,路由规则topic.#

     

    发送消息的路由键改为:topic.msg

       /**
         * 测试发送消息到topic交换机
         */
        @Test
        public void sendTopic(){
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("topicExchange","topic.msg", "主题消息:你好" + i);
            }
        }

    运行结果:发送消息的路由键为topic.msg,两个队列的路由都匹配上了。

     

    好记性不如烂笔头,提升自己:http://www.urlort.cn/1DjfQb github地址:https://github.com/997480972
  • 相关阅读:
    GET 请求和 POST 请求
    爬虫
    模板继承
    静态文件配置
    终端cmd创建django
    商城商品分类导航效果
    css样式
    视图部分
    django初识和路由
    【源码分析】cocos2dx的Action
  • 原文地址:https://www.cnblogs.com/liuyong1993/p/10408325.html
Copyright © 2011-2022 走看看