zoukankan      html  css  js  c++  java
  • RabbitMq集成springboot

    Config配置文件中,生产者和消费者都自己声明下需要的exchange和queue,否则如果服务器没有创建对应的信息,则自己启动会报错。
     
    一、生产者属性文件配置
     
    spring:
      rabbitmq:
        host: 192.168.112.131
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        virtual-host: my_vhost
        publisher-returns: true

    二、生产者配置类的声明

    @Configuration
    @Slf4j
    public class RabbitmqConfig {
    
    
        /**
         * @param
         * @return
         * @throws Exception
         * @date 2020/10/9
         * @version 2、durable 持久化消息队列,默认true
         * 3、auto-delete 消息队列没有在使用时自动删除,默认false
         * 4、exclusive 是否有排他性,就是是否只允许一个消费者消费,默认false
         */
        @Bean(name = "message")
        public Queue queueMesage() {
            Map<String,Object> arguments = new HashMap<>();
            //声明死信exchange 和路由,后面在讲死信队列绑定到这个路由上即可
            arguments.put("x-dead-letter-exchange","tony.dead.ex");
            arguments.put("x-dead-letter-routing-key","tony.dead.rk");
            return new Queue("tony.queue", true, false, false,arguments);
        }
        //声明业务exchange
        @Bean(name = "exchange")
        public DirectExchange exchange() {
            return new DirectExchange("tony.exchange",true,false);
        }
    
    
        //声明死信队列-被拒绝的信息放到这个队列
        @Bean(name = "deadqueueMesage")
        public Queue deadqueueMesage() {
            return new Queue("tony.dead.queue", true);
        }
        //声明死信exchange
        @Bean(name = "deadexchange")
        public DirectExchange deadexchange() {
            return new DirectExchange("tony.dead.ex",true,false);
        }
        //绑定队列和交换机
        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage,
                                       @Qualifier("exchange") DirectExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).
                    with("tony.routeKey");
        }
        //绑定死信队列和死信交换机
        @Bean
        Binding bindingDeadExchangeMessage(@Qualifier("deadqueueMesage") Queue queueMessage,
                                       @Qualifier("deadexchange") DirectExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).
                    with("tony.dead.rk");
        }
    
    
    /*    @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    //        rabbitTemplate.setMessageConverter(converter());
            // 消息是否成功发送到Exchange
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息成功发送到Exchange" + correlationData.getId());
                } else {
                    //重新发送到mq
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
            });
    
    
            // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
            rabbitTemplate.setMandatory(true);
            // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
    
    
            });
    
    
            return rabbitTemplate;
        }*/
    }
    View Code

    三、消费者属性文件的配置

    spring:
      rabbitmq:
        host: 192.168.112.131
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        publisher-returns: true
        virtual-host: my_vhost
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true
              max-attempts: 3
              initial-interval: 5000ms

    或者

    配置可以通过配置文件指定,也可以通过配置类指定:
     
    @Configuration
    public class RabbitConfig {
    
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
            factory.setPrefetchCount(10);
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    }
    View Code

    四、消费者消费消息

    @Slf4j
    @Component
    public class MessageListener {
    
        @RabbitListener(queues = "tony.queue")
        public void process(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            log.info(Thread.currentThread().getName() + "=============接收到消息=============" + message + "--" + deliveryTag);
            try {
                if ("tony18".equals(message)) {
                    int i = 1 / 0;
                    System.out.println( i );
                }
            } catch (Exception e) {
                //丢死信队列
                try {
                    channel.basicNack(deliveryTag, false, false);
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
    
            try {
                channel.basicAck(deliveryTag, true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code
    五、带有返回值的生产者和消费者
     1)生产者
     
    public class RabbitmqConfig {
    
     
        @Bean
        public Queue queueKillMessage() {
            return new Queue("tony.queue.back", true, false, false);
        }
    
    
        @Bean
        public DirectExchange queuekillExchangeBack() {
            return new DirectExchange("tony.exchange.back", true, false);
        }
    
    
        @Bean
        public Binding bindingSecKillExchangeMessage() {
            return BindingBuilder
                    .bind(queueKillMessage())
                    .to(queuekillExchangeBack())
                    .with("tony.routeKey.back");
        }
    
    
    }
    View Code

    2)发送消息

       public String sendAndReceive(String msg1) {
    
    
            //设置消息唯一id
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            //直接发送message对象
            MessageProperties messageProperties = new MessageProperties();
            //过期时间10秒,也是为了减少消息挤压的可能
    //        messageProperties.setExpiration("10000");
            messageProperties.setCorrelationId(correlationId.getId());
            Message message = new Message(msg1.getBytes(), messageProperties);
            log.info("TopicSender send the 1st : " + msg1);
            //设置消息唯一id
            Message message1 = amqpTemplate.sendAndReceive("tony.exchange.back", "tony.routeKey.back", message, correlationId);
            System.out.println("返回值:"+message1);
            if ( message1==null){
                return null;
            }else{
                return new String(message1.getBody());
            }
        }
    View Code

    3)消费者

    @Configuration
    public class RabbitConfig {
    
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
            factory.setPrefetchCount(10);
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    
    
        @Bean
        public Queue queueKillMessage() {
            return new Queue("tony.queue.back", true, false, false);
        }
    
    
        @Bean
        public DirectExchange queuekillExchangeBack() {
            return new DirectExchange("tony.exchange.back", true, false);
        }
    
    
        @Bean
        public Binding bindingSecKillExchangeMessage() {
            return BindingBuilder
                    .bind(queueKillMessage())
                    .to(queuekillExchangeBack())
                    .with("tony.routeKey.back");
        }
    }
    View Code

    4)消费者监听器声明

    @Component
    public class MessageListener {
    
     
        @RabbitListener(queues = "tony.queue.back"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
        @RabbitHandler // 此注解加上之后可以接受对象型消息
        public String processCallback(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
            try {
                String msg = new String(message.getBody());
                log.info("UserReceiver>>>>>>>接收到消息:" + msg);
                try {
    
    
                    log.info("UserReceiver>>>>>>消息已消费");
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条        
                    return UUID.randomUUID().toString();
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
                    log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
                    throw e;
                }
            } catch (Exception e) {
                log.info(e.getMessage());
            }
            return "";
        }
    
    
    }
    View Code

    六、带有确认机制的生产者

     @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
            connectionFactory.setPublisherConfirms(publisherConfirms);
            connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        public RabbitTemplate newRabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息成功发送到Exchange,messageId:" + correlationData.getId());
                    //修改日志表状态,状态改成 1,投递成功且未确认
                    if(updateMessageLog(1,Long.valueOf(correlationData.getId())) == 1) {
                        log.info("------modify status 1 ok--------");
                    }
                } else {
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
            });
            template.setMandatory(true);
            template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            });
            //不使用临时队列
    //        template.setUseTemporaryReplyQueues(false);
    //        template.setReplyAddress("amq.rabbitmq.reply-to");
    //        template.setUserCorrelationId(true);
    //        template.setReplyTimeout(10000);
            return template;
        }
    View Code
    七、官网参考
     
  • 相关阅读:
    新内容记录:
    一个用于提取简体中文字符串中省,市和区并能够进行映射,检验和简单绘图的python模块
    django后台获取相同name名的数据
    python 使用qqwry.dat获取ip物理地址:速度快
    laydate设置起始时间,laydate设置开始时间和结束时间
    评论抓取:Python爬取微信在APPStore上的评论内容及星级
    H5上传压缩图片
    Django自带的加密算法及加密模块
    git入门
    WPF
  • 原文地址:https://www.cnblogs.com/lean-blog/p/14150928.html
Copyright © 2011-2022 走看看