zoukankan      html  css  js  c++  java
  • RabbitMq集成springboot

    Config配置文件中,生产者和消费者都自己声明下需要的exchange和queue,否则如果服务器没有创建对应的信息,则自己启动会报错。
     
    一、生产者属性文件配置
     
    spring:
      rabbitmq:
        host: 192.168.112.131
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        virtual-host: my_vhost
        publisher-returns: true

    二、生产者配置类的声明

    @Configuration
    @Slf4j
    public class RabbitmqConfig {
    
    
        /**
         * @param
         * @return
         * @throws Exception
         * @date 2020/10/9
         * @version 2、durable 持久化消息队列,默认true
         * 3、auto-delete 消息队列没有在使用时自动删除,默认false
         * 4、exclusive 是否有排他性,就是是否只允许一个消费者消费,默认false
         */
        @Bean(name = "message")
        public Queue queueMesage() {
            Map<String,Object> arguments = new HashMap<>();
            //声明死信exchange 和路由,后面在讲死信队列绑定到这个路由上即可
            arguments.put("x-dead-letter-exchange","tony.dead.ex");
            arguments.put("x-dead-letter-routing-key","tony.dead.rk");
            return new Queue("tony.queue", true, false, false,arguments);
        }
        //声明业务exchange
        @Bean(name = "exchange")
        public DirectExchange exchange() {
            return new DirectExchange("tony.exchange",true,false);
        }
    
    
        //声明死信队列-被拒绝的信息放到这个队列
        @Bean(name = "deadqueueMesage")
        public Queue deadqueueMesage() {
            return new Queue("tony.dead.queue", true);
        }
        //声明死信exchange
        @Bean(name = "deadexchange")
        public DirectExchange deadexchange() {
            return new DirectExchange("tony.dead.ex",true,false);
        }
        //绑定队列和交换机
        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage,
                                       @Qualifier("exchange") DirectExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).
                    with("tony.routeKey");
        }
        //绑定死信队列和死信交换机
        @Bean
        Binding bindingDeadExchangeMessage(@Qualifier("deadqueueMesage") Queue queueMessage,
                                       @Qualifier("deadexchange") DirectExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).
                    with("tony.dead.rk");
        }
    
    
    /*    @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    //        rabbitTemplate.setMessageConverter(converter());
            // 消息是否成功发送到Exchange
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息成功发送到Exchange" + correlationData.getId());
                } else {
                    //重新发送到mq
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
            });
    
    
            // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
            rabbitTemplate.setMandatory(true);
            // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
    
    
            });
    
    
            return rabbitTemplate;
        }*/
    }
    View Code

    三、消费者属性文件的配置

    spring:
      rabbitmq:
        host: 192.168.112.131
        port: 5672
        username: admin
        password: admin
        publisher-confirms: true
        publisher-returns: true
        virtual-host: my_vhost
        listener:
          simple:
            acknowledge-mode: manual
            retry:
              enabled: true
              max-attempts: 3
              initial-interval: 5000ms

    或者

    配置可以通过配置文件指定,也可以通过配置类指定:
     
    @Configuration
    public class RabbitConfig {
    
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
            factory.setPrefetchCount(10);
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    }
    View Code

    四、消费者消费消息

    @Slf4j
    @Component
    public class MessageListener {
    
        @RabbitListener(queues = "tony.queue")
        public void process(@Payload String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
            log.info(Thread.currentThread().getName() + "=============接收到消息=============" + message + "--" + deliveryTag);
            try {
                if ("tony18".equals(message)) {
                    int i = 1 / 0;
                    System.out.println( i );
                }
            } catch (Exception e) {
                //丢死信队列
                try {
                    channel.basicNack(deliveryTag, false, false);
                } catch (IOException ioException) {
                    ioException.printStackTrace();
                }
            }
    
            try {
                channel.basicAck(deliveryTag, true);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    View Code
    五、带有返回值的生产者和消费者
     1)生产者
     
    public class RabbitmqConfig {
    
     
        @Bean
        public Queue queueKillMessage() {
            return new Queue("tony.queue.back", true, false, false);
        }
    
    
        @Bean
        public DirectExchange queuekillExchangeBack() {
            return new DirectExchange("tony.exchange.back", true, false);
        }
    
    
        @Bean
        public Binding bindingSecKillExchangeMessage() {
            return BindingBuilder
                    .bind(queueKillMessage())
                    .to(queuekillExchangeBack())
                    .with("tony.routeKey.back");
        }
    
    
    }
    View Code

    2)发送消息

       public String sendAndReceive(String msg1) {
    
    
            //设置消息唯一id
            CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
            //直接发送message对象
            MessageProperties messageProperties = new MessageProperties();
            //过期时间10秒,也是为了减少消息挤压的可能
    //        messageProperties.setExpiration("10000");
            messageProperties.setCorrelationId(correlationId.getId());
            Message message = new Message(msg1.getBytes(), messageProperties);
            log.info("TopicSender send the 1st : " + msg1);
            //设置消息唯一id
            Message message1 = amqpTemplate.sendAndReceive("tony.exchange.back", "tony.routeKey.back", message, correlationId);
            System.out.println("返回值:"+message1);
            if ( message1==null){
                return null;
            }else{
                return new String(message1.getBody());
            }
        }
    View Code

    3)消费者

    @Configuration
    public class RabbitConfig {
    
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
            factory.setPrefetchCount(10);
            /*
             * AcknowledgeMode.NONE:自动确认
             * AcknowledgeMode.AUTO:根据情况确认
             * AcknowledgeMode.MANUAL:手动确认
             */
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    
    
        @Bean
        public Queue queueKillMessage() {
            return new Queue("tony.queue.back", true, false, false);
        }
    
    
        @Bean
        public DirectExchange queuekillExchangeBack() {
            return new DirectExchange("tony.exchange.back", true, false);
        }
    
    
        @Bean
        public Binding bindingSecKillExchangeMessage() {
            return BindingBuilder
                    .bind(queueKillMessage())
                    .to(queuekillExchangeBack())
                    .with("tony.routeKey.back");
        }
    }
    View Code

    4)消费者监听器声明

    @Component
    public class MessageListener {
    
     
        @RabbitListener(queues = "tony.queue.back"/*,errorHandler = "rabbitConsumerListenerErrorHandler"*/)
        @RabbitHandler // 此注解加上之后可以接受对象型消息
        public String processCallback(Message message, Channel channel, @Headers Map<String, Object> headers) throws Exception {
            try {
                String msg = new String(message.getBody());
                log.info("UserReceiver>>>>>>>接收到消息:" + msg);
                try {
    
    
                    log.info("UserReceiver>>>>>>消息已消费");
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//手工确认,可接下一条        
                    return UUID.randomUUID().toString();
                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);//失败,则直接忽略此订单
                    log.info("UserReceiver>>>>>>拒绝消息,直接忽略");
                    throw e;
                }
            } catch (Exception e) {
                log.info(e.getMessage());
            }
            return "";
        }
    
    
    }
    View Code

    六、带有确认机制的生产者

     @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses+":"+port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
            connectionFactory.setPublisherConfirms(publisherConfirms);
            connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            return new RabbitAdmin(connectionFactory);
        }
    
    
        @Bean
        public RabbitTemplate newRabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息成功发送到Exchange,messageId:" + correlationData.getId());
                    //修改日志表状态,状态改成 1,投递成功且未确认
                    if(updateMessageLog(1,Long.valueOf(correlationData.getId())) == 1) {
                        log.info("------modify status 1 ok--------");
                    }
                } else {
                    log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
                }
            });
            template.setMandatory(true);
            template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
            });
            //不使用临时队列
    //        template.setUseTemporaryReplyQueues(false);
    //        template.setReplyAddress("amq.rabbitmq.reply-to");
    //        template.setUserCorrelationId(true);
    //        template.setReplyTimeout(10000);
            return template;
        }
    View Code
    七、官网参考
     
  • 相关阅读:
    bzoj3505 数三角形 组合计数
    cogs2057 殉国 扩展欧几里得
    cogs333 荒岛野人 扩展欧几里得
    bzoj1123 BLO tarjan求点双连通分量
    poj3352 road construction tarjan求双连通分量
    cogs1804 联合权值 dp
    cogs2478 简单的最近公共祖先 树形dp
    cogs1493 递推关系 矩阵
    cogs2557 天天爱跑步 LCA
    hdu4738 Caocao's Bridge Tarjan求割边
  • 原文地址:https://www.cnblogs.com/lean-blog/p/14150928.html
Copyright © 2011-2022 走看看