zoukankan      html  css  js  c++  java
  • RabbitMq 实现延时队列-Springboot版本

    rabbitmq本身没有实现延时队列,但是可以通过死信队列机制,自己实现延时队列;

    原理:当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列;

    步骤:

    1、创建带有时限的队列 dealLineQueue;

    2、创建死信Faout交换机dealLineExchange;

    3、创建消费队列realQueue,并和dealLineExchange绑定

    4、配置dealLineQueue 的过期时间,消息过期后的死信交换机,重发的routing-key;

    以下以springboot为例子贴出代码

    项目结构:

    基本值-DealConstant

    package com.eyjian.rabbitmq.dealline;
    
    public interface DealConstant {
    
        String DEAL_LINE_QUEUE = "dealLineQueue";
    
        String DEAL_LINE_EXCHANGE = "dealLineExchange";
    
        String REAL_QUEUE= "realQueue";
    
    
    }
    
    
    消费者Lister
    package com.eyjian.rabbitmq.dealline;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    
    /**
     * 死信队里模拟延时队列
     * @Author: yeyongjian
     * @Date: 2019-05-18 14:12
     */
    @Component
    public class Lister {
        @RabbitListener(queues = DealConstant.REAL_QUEUE)
        public void handle(Message message){
            byte[] body = message.getBody();
            String msg = new String(body);
            System.out.println(msg);
    
        }
    }
    
    

    配置类RabbitmqConfig

    
    
    package com.eyjian.rabbitmq.dealline;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    public class RabbitmqConfig {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //启动初始化删除绑定用的
    //    @PostConstruct
        public void delete() throws IOException {
            Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
            channel.queueUnbind(DealConstant.REAL_QUEUE,DealConstant.DEAL_LINE_EXCHANGE,"");
        }
        @Bean
        public Queue initDealLineQueue() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", DealConstant.DEAL_LINE_EXCHANGE);
            args.put("x-dead-letter-routing-key", DealConstant.DEAL_LINE_QUEUE);//超时转发的队列
            args.put("x-message-ttl", 5000);//延时时间
            Queue queue = new Queue(DealConstant.DEAL_LINE_QUEUE,true,false,false,args);
            return queue;
        }
        @Bean
        FanoutExchange dealLineExchange() {
            return new FanoutExchange(DealConstant.DEAL_LINE_EXCHANGE);
        }
        @Bean
        Binding bindingiVewUgcTopicExchange(Queue initRealQueue, FanoutExchange dealLineExchange) {
            return BindingBuilder.bind(initRealQueue).to(dealLineExchange);
        }
        @Bean
        public Queue initRealQueue() {
            return new Queue(DealConstant.REAL_QUEUE);
        }
    
    
    }


    application.properties文件

    spring.rabbitmq.addresses=localhost
    spring.rabbitmq.host=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    
    



    项目启动后,rabbitmq控制台信息如下:

     
     

    test类发送消息

    package com.eyjian.rabbitmq;
    
    import com.eyjian.rabbitmq.dealline.DealConstant;
    import com.rabbitmq.client.Channel;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.io.IOException;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class RabbitmqLearnApplicationTests {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        @Test
        public void contextLoads() throws IOException {
            rabbitTemplate.convertAndSend(DealConstant.DEAL_LINE_QUEUE,"hell word");
        }
    
    
    }

    5秒后控制台打印消息

    源码地址:https://github.com/hd-eujian/rabbitmq-learn.git

  • 相关阅读:
    8张图理解Java
    PhotoShop切图
    Java中堆内存和栈内存详解【转】
    Java编程性能优化一些事儿【转】
    Java反射机制--笔记
    JUnit单元测试--IntelliJ IDEA
    深入理解Java:自定义java注解
    基于值函数的强化学习 小例子(策略退化)
    动态规划中 策略迭代 和 值迭代 的一个小例子
    爬格子问题(经典强化学习问题) Sarsa 与 Q-Learning 的区别
  • 原文地址:https://www.cnblogs.com/yeyongjian/p/10886347.html
Copyright © 2011-2022 走看看