zoukankan      html  css  js  c++  java
  • RabbitMq-更新:正常队列+死信队列

    1.依赖 SpringBoot 2.1.6.RELEASE 版本

    <!--rabbitMQ-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2.配置信息

    #设置端口
    server.port=80
    #安装的RabbitMq的服务器IP
    spring.rabbitmq.host=192.168.***.**
    #安装的RabbitMq的服务器端口
    spring.rabbitmq.port=5672
    #安装的RabbitMq的用户名
    spring.rabbitmq.username=xxx
    #安装的RabbitMq的密码
    spring.rabbitmq.password=xxx
    #消息确认机制
    spring.rabbitmq.publisher-confirms=true
    #与消息确认机制联合使用,保证能够收到回调
    spring.rabbitmq.publisher-returns=true
    #消息确认模式 MANUAL:手动确认  NONE:不确认  AUTO:自动确认
    spring.rabbitmq.listener.simple.acknowledge-mode=auto
    #消费者
    spring.rabbitmq.listener.simple.concurrency=10
    spring.rabbitmq.listener.simple.max-concurrency=10
    #发布后重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    spring.rabbitmq.listener.simple.retry.initial-interval=5000
    spring.rabbitmq.listener.simple.retry.max-attempts=5
    #每隔多久进行重试
    spring.rabbitmq.template.retry.multiplier=1.0
    #消费失败后重新消费
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    #自定义的vhost
    spring.rabbitmq.dev-virtual-host=devVir
    spring.rabbitmq.test-virtual-host=testVir
    

    3.配置信息:此处为多个Vhost配置,单个可直接使用,无需另外配置,只需声明队列信息即可

    package com.rabbit.config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    /**
     * 2019年7月7日15:43:38 Joelan整合 RabbitConfig 概念介绍:
     * 1.Queue:队列,是RabbitMq的内部对象,用于存储消息,RabbitMq的多个消费者可以订阅同一个队列,此时队列会以轮询的方式给多个消费者消费,而非多个消费者都收到所有的消息进行消费
     * 注意:RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。
     * 2.Exchange:交换器,在RabbitMq中,生产者并非直接将消息投递到队列中。真实情况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。
     * 注意:如果路由不到,或返回给生产者,或直接丢弃,或做其它处理。
     * 3.RoutingKey:路由Key,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能
     * 最终生效。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。
     * 4.Binding:RabbitMQ通过绑定将交换器和队列关联起来,在绑定的时候一般会指定一个绑定键,这样RabbitMQ就可以指定如何正确的路由到队列了。
     */
    @Configuration
    public class RabbitConfig {
    
        /**
         * RabbitMq的主机地址
         */
        @Value("${spring.rabbitmq.host}")
        private String host;
    
        /**
         * RabbitMq的端口
         */
        @Value("${spring.rabbitmq.port}")
        private Integer port;
    
        /**
         * 用户账号
         */
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        /**
         * 用户密码
         */
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        /**
         * 消息确认,回调机制
         */
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean confirms;
        @Value("${spring.rabbitmq.publisher-returns}")
        private boolean returns;
    
        /**
         * vhost:dev
         */
        @Value("${spring.rabbitmq.dev-virtual-host}")
        private String hrmDevVirtualHost;
    
        /**
         * vhost:test
         */
        @Value("${spring.rabbitmq.test-virtual-host}")
        private String hrmTestVirtualHost;
    
        /**
         * 若一个项目只使用一个virtualHost的话,默认只需要在配置文件中配置其属性即可
         * 若项目中使用到多个virtualHost,那么可以以通过创建ConnectionFactory的方式指定不同的virtualHost
         */
        public ConnectionFactory createConnectionFactory(String host, Integer port, String username, String password,
                String vHost) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setHost(host);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setSimplePublisherConfirms(confirms);
            connectionFactory.setPublisherReturns(returns);
            connectionFactory.setVirtualHost(vHost);
            return connectionFactory;
        }
    
        // ----------------------------------------------------------------------------------------第一步,创建消息连接,第一个VirtualHost
        /**
         * 创建指定vhost:test的连接工厂
         */
        @Primary
        @Bean(name = "devConnectionFactory")
        public ConnectionFactory devConnectionFactory() {
            return createConnectionFactory(host, port, username, password, hrmDevVirtualHost);
        }
    
        /**
         * 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost
         */
        @Primary
        @Bean(name = "devRabbitTemplate")
        public RabbitTemplate devRabbitTemplate(
                @Qualifier(value = "devConnectionFactory") ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // 消息确认机制,ConnectionFactory中必须设置回调机制(publisher-confirms,publisher-returns)
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if (ack) {
                        System.out.println("消息id为: " + correlationData + "的消息,已经被ack成功");
                    } else {
                        System.out.println("消息id为: " + correlationData + "的消息,消息nack,失败原因是:" + cause);
                    }
                }
            });
            return rabbitTemplate;
        }
    
        // ----------------------------------------------------------------------------------------第二个VirtualHost,以此类推
        /**
         * 创建指定vhost:test的连接工厂
         */
        @Bean(name = "testConnectionFactory")
        public ConnectionFactory testConnectionFactory() {
            return createConnectionFactory(host, port, username, password, hrmTestVirtualHost);
        }
    
        /**
         * 若有多个vhost则自定义RabbitMqTemplate 通过名称指定对应的vhost,此处未使用回调
         */
        @Bean(name = "testRabbitTemplate")
        public RabbitTemplate testRabbitTemplate(
                @Qualifier(value = "testConnectionFactory") ConnectionFactory connectionFactory) {
            return new RabbitTemplate(connectionFactory);
        }
    
        // ----------------------------------------------------------------------------------------引入:死信队列
        /**
         * 所谓死信:即(1)消息被拒绝(basic.reject 或者 basic.nack),并且requeue=false;(2)消息的过期时间到期了;
         * (3)队列长度限制超过了 等三个因素造成。
         * 我们会将以上原因造成的队列存入死信队列,死信队列其实也是一个普通的队列,我们可以根据自身需要,可以对死信进行操作。
         * 以下为死信队列的演示(将正常队列监听关闭并设置超时):首先声明一个正常的队列,并设置死信队列的相关声明【死信交换器(与正常队列一致即可),死信路由Key等】
         * 设置完后,准备一个新的队列,此队列用于接收上一个正常队列发生死信后,将由此队列代替(即候补队列),然后将新队列通过上一个交换器以及正常队列中声明的死信路由Key进行绑定
         * 该操作与正常声明一致(声明交换器(可使用正常队列的交换器,无需另外声明),队列,将队列绑定到交换器)
         */
    
        /**
         * 声明交换器(此处正常的与死信的交换器一致)
         */
        @Bean
        public Exchange testExchange() {
            return new DirectExchange("test_exchange", true, false);
        }
    
        /**
         * 声明一个正常的队列,并设置死信相关信息(交换器,路由Key),确保发生死信后会将死信存入交换器
         */
        @Bean
        public Queue testQueue() {
            Map<String, Object> args = new HashMap<>(4);
            // x-dead-letter-exchange 声明 死信交换机
            args.put("x-dead-letter-exchange", "test_exchange");
            // x-dead-letter-routing-key 声明死信路由键
            args.put("x-dead-letter-routing-key", "test_dead_rout");
            return new Queue("test_queue", true, false, false, args);
        }
    
        /**
         * 将队列绑定到指定交换器并设置路由
         */
        @Bean
        public Binding testBinding() {
            return BindingBuilder.bind(testQueue()).to(testExchange()).with("test_rout").noargs();
        }
    
        /**
         * 死信队列(候补队列) 若上面的正常队列发生死信时,需将发生死信的队列信息路由到此队列中
         * 路由过程:正常队列发送->信息到交换器->交换器路由到正常队列->监听,发生死信->死信回到指定的交换器->再由交换器路由到死信队列->死信监听
         */
        @Bean
        public Queue testDeadQueue() {
            return new Queue("test_dead_queue", true, false, false);
        }
    
        /**
         * 绑定死信的队列到候补队列
         */
        @Bean
        public Binding testDeadBinding() {
            return BindingBuilder.bind(testDeadQueue()).to(testExchange()).with("test_dead_rout").noargs();
        }
    
        // ----------------------------------------------------------------------------------------第二步,声明队列信息,Fanout模式
        /**
         * 此处使用第一个正常队列来示例完整队列过程 创建队列 参数name:队列的名称,不能为空;设置为“”以使代理生成该名称。
         * 参数durable:true表示为持久队列,该队列将在服务器重新启动后继续存在
         * 参数exclusive:如果声明独占队列,则为true,该队列将仅由声明者的连接使用
         * 参数autoDelete:如果服务器不再使用队列时应将其删除,则自动删除为true 参数arguments:用于声明队列的参数
         */
        @Bean
        public Queue testFanoutQueue() {
            /*
             * 1.new Queue(name); return new Queue("test_fanout_queue");
             */
    
            /*
             * 2.new Queue(name,durable);
             */
            return new Queue("test_fanout_queue", true, false, false);
    
            /*
             * 3.new Queue(name,durable,exclusive,autoDelete); return new
             * Queue("test_fanout_queue", true, false, false);
             */
    
            /*
             * 4.new Queue(name,durable,exclusive,autoDelete,arguments); return new
             * Queue("test_fanout_queue", true, true, true, null);
             */
        }
    
        /**
         * 创建交换机 1.fanout:扇形交换器,它会把发送到该交换器的消息路由到所有与该交换器绑定的队列中,如果使用扇形交换器,则不会匹配路由Key
         * 白话:一个交换机可以绑定N个队列,此模式会将写入的队列发送到一个交换机,由此交换机发送到N个队列中,那么监听该队列的消费者都能收到对应的消息
         */
        @Bean
        @Primary
        public Exchange testFanoutExchange() {
            return new FanoutExchange("test_fanout_exchange");
        }
    
        /**
         * 绑定队列到交换机 Fanout模式不需要RoutingKey
         */
        @Bean
        public Binding testFanoutBinding() {
            return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange()).with("").noargs();
        }
    
        // ----------------------------------------------------------------------------------------Direct模式
        /**
         * 创建队列
         */
        @Bean
        public Queue testDirectQueue() {
            return new Queue("test_direct_queue", true, false, false);
        }
    
        /**
         * 创建交换机 2.direct交换器 直连模式,会把消息路由到RoutingKey与BindingKey完全匹配的队列中。
         * 白话:直连模式在绑定队列到交换机的时候,RoutingKey与发送队列的RoutingKey要完全保持一致
         */
        @Bean
        public Exchange testDirectExchange() {
            return new TopicExchange("test_direct_exchange");
        }
    
        /**
         * 绑定队列到交换机并指定一个路由,此处的RoutingKey为test,发送队列时也必须使用test
         */
        @Bean
        public Binding testDirectBinding() {
            return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("test").noargs();
        }
    
        // ----------------------------------------------------------------------------------------Topic模式
        /**
         * 创建队列
         */
        @Bean
        public Queue testTopicQueue() {
            return new Queue("test_topic_queue", true, false, false);
        }
    
        /**
         * 创建交换机 2.topic 匹配模式(个人)与直连模式区别:RoutingKey可以模糊匹配,两种匹配风格: *匹配 #匹配
         * 我们的RoutingKey和BindKey为一个点分隔的字符串,例:test.routing.client
         * 那么我们的模糊匹配,*可以匹配一个单词,即:*.routing.* 可以匹配到 test.routing.client,
         * #可以匹配多个单词,即:#.client 可以匹配到 test.routing.client,以此类推
         */
        @Bean
        public Exchange testTopicExchange() {
            return new TopicExchange("test_topic_exchange");
        }
    
        /**
         * 绑定队列到交换机并指定一个路由
         */
        @Bean
        public Binding testTopicBinding() {
            return BindingBuilder.bind(testTopicQueue()).to(testTopicExchange()).with("test.*").noargs();
        }
    
        // ----------------------结束强调:第一步创建连接,第二步声明队列,交换器,路由Key信息,第三步发送队列,第四步监听队列
    }

    4.发送队列

    package com.rabbit.send;
    
    import java.util.UUID;
    
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Component;
    
    /**
     * RabbitSend
     */
    @Component
    public class RabbitSend {
    
        @Autowired
        @Qualifier(value = "devRabbitTemplate")
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送死信队列
         */
        public void sendDeadMsg(String msg) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            // 声明消息处理器 这个对消息进行处理 可以设置一些参数 对消息进行一些定制化处理 我们这里 来设置消息的编码 以及消息的过期时间
            // 因为在.net 以及其他版本过期时间不一致 这里的时间毫秒值 为字符串
            MessagePostProcessor messagePostProcessor = message -> {
                MessageProperties messageProperties = message.getMessageProperties();
                // 设置编码
                messageProperties.setContentEncoding("utf-8");
                // 设置过期时间10*1000毫秒
                messageProperties.setExpiration("10000");
                return message;
            };
            // 向test_queue 发送消息 10*1000毫秒后过期 形成死信,具体的时间可以根据自己的业务指定
            rabbitTemplate.convertAndSend("test_exchange", "test_rout", msg, messagePostProcessor, correlationData);
        }
    
        /**
         * 发送一条Fanout扇形队列
         */
        public void sendTestFanoutMsg(String msg) {
            rabbitTemplate.convertAndSend("test_fanout_exchange", "", msg, new CorrelationData("2"));
        }
    
        /**
         * 发送一条Direct直连队列 若有开启回调机制,必须传此参数new CorrelationData("1"),用于声明ID
         */
        public void sendTestDirectMsg(String msg) {
            rabbitTemplate.convertAndSend("test_direct_exchange", "test", msg, new CorrelationData("1"));
        }
    
        /**
         * 发送一条Topic消息队列
         */
        public void sendTestMsg(String msg) {
            rabbitTemplate.convertAndSend("test_topic_exchange", "test.mq", msg);
        }
    }

    5.监听队列

    package com.rabbit.receiver;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.Channel;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * RabbitReceiver
     */
    @Component
    public class RabbitReceiver {
    
        /**
         * 若死信队列监听到信息,表示我们的死信队列设置是没有问题的
         */
        @RabbitHandler
        @RabbitListener(queues = "test_dead_queue")
        public void redirect(Message message, Channel channel) throws IOException {
            System.out.println("监听到死信队列有消息进来");
        }
    
    /**
    * 为了测试死信队列效果,此处注销监听
    */ //@RabbitHandler //@RabbitListener(queues = "test_queue") //public void handlerTestQueue(Message message, Channel channel) throws IOException { // System.out.println("监听到正常队列有消息进来"); //} @RabbitHandler @RabbitListener(queues = "test_fanout_queue") public void handlerFanout(String msg) { System.out.println("RabbitReceiver:" + msg + "test_fanout_queue"); } @RabbitHandler @RabbitListener(queues = "test_direct_queue") public void handlerDirect(String msg) { System.out.println("RabbitReceiver:" + msg + "test_direct_queue"); } @RabbitHandler @RabbitListener(queues = "test_topic_queue") public void handlerTopic(String msg) { System.out.println("RabbitReceiver:" + msg + "test_topic_queue"); } }

      

      

  • 相关阅读:
    Markdown基本语法学习
    gauge自动化测试框架(二)
    初识gauge自动化测试框架
    火狐浏览器历代版本下载地址
    selenium + python实现截图并且保存图片
    Python搭建简易HTTP服务(3.x版本和2.x版本的)
    ((0.1+0.7)*10) = ?
    从apache mod_php到php-fpm[转]
    Web API文档生成工具apidoc
    微信支付获取 prepay id 偶尔失败问题【转】
  • 原文地址:https://www.cnblogs.com/joelan0927/p/11143408.html
Copyright © 2011-2022 走看看