zoukankan      html  css  js  c++  java
  • 基于redisson的延迟队列

    通常在一个jvm进程中,若想实现延迟逻辑,可以使用jdk自带的延迟队列DelayQueue来实现。DelayQueue中的元素PriorityQueue来实现的,DelayQueue中的元素会实现

    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }

    即可在DelayQueue进行poll操作时候获取最近需要的元素。但是这种延时队列是保存在内存中,所以一旦进程关闭或崩溃,队列中的数据都会丢失,所以只有配合持久化才可以保证数据不丢失。

    那么如果在多进程条件下,如果要实现延迟队列,则需要一个统一的地方保存延迟元素,这个元素可以被称为任务,redis是一个不错的选择。Redisson实现了集群环境下延迟队列的实现。

    引入reddison依赖

    <dependency>
         <groupId>org.redisson</groupId>
         <artifactId>redisson</artifactId>
         <version>3.10.3</version>
    </dependency>

    redis基本配置

        private Config initRedissonConfig() {
            Config config = new Config();
            config.useSingleServer()
                    .setAddress("redis://" + host + ":" + port)
                    .setTimeout(timeout)
                    .setConnectionPoolSize(maxIdle)
                    .setConnectionMinimumIdleSize(minIdle);
            return config;
        }
    
        @Bean(destroyMethod = "shutdown")
        public RedissonClient redissonClient() {
            Config config = initRedissonConfig();
            return Redisson.create(config);
        }

    定义redisson阻塞队列,注册相关bean

    public class QueueConfig {

    private final String queueName = "queue";

    @Bean
    public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonClient") RedissonClient redissonClient) {
    return redissonClient.getBlockingQueue(queueName);
    }

    @Bean
    public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonClient") RedissonClient redissonClient,
    @Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {
    return redissonClient.getDelayedQueue(blockQueue);
    }
    }

    下面进行测试,TakeTask负责消费队列中的任务

    public class TakeTask {
    
        @Resource(name = "rBlockingQueue")
        private RBlockingQueue<String> rBlockingQueue;
    
        @PostConstruct
        public void take() {
    
            new Thread(() -> {
                while (true) {
                    try {
                        String s = rBlockingQueue.take();
                        System.out.println(s);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    在延时队列rDelayQueue中延迟添加任务,这里需要调用带参数的offerAsync的方法,延时添加。

    @RestController
    @RequestMapping("/")
    public class TestController {
    
        @Resource(name = "rDelayedQueue")
        private RDelayedQueue<String> rDelayedQueue;
    
        @GetMapping("/offer")
        public void offer() {
            for (int i = 1; i <= 2; i++) {
                rDelayedQueue.offerAsync("task: " + i, 1, TimeUnit.SECONDS);
            }
        }
    }

    由于延时队列持久化在redis中,所以机器宕机数据不会异常丢失,机器重启后,会正常消费队列中积累的任务。

    对于jdk中的DelayQueue延时队列是采用zset来实现,每次add,会立即将元素添加到队列中,zset会根据指定的字段进行排序,维护一个优先队列,当进行take操作时候,取到头节点的数据一定是最大或者最小的,但是此时头节点不一定能取出来,需要多一步判断,这一步其实就是  public long getDelay(TimeUnit unit);要实现的方法,只有返回值大于0才会真正被取出来。redission的延时队列是异步延时加入的,也就是说并没有立刻加入队列中,而是在指定的延时时间delay之后才会加入,所以在take的时候是一定可以直接取出来队列中的元素。

    
    
  • 相关阅读:
    C++ Operate FTP
    md /mdd /ml /mt/mtd
    从MySpace基于.NET平台的六次重构经历,来感受分布式系统。
    分布式缓存BeIT Memcached简介
    Asp.Net应用程序中为什么要MachineKey?如何生成MachineKey?
    马云飞机上写长贴:再一次和新同事们谈谈看法
    memcached完全剖析
    ESET ESS/EAV 5 正式版 中英文32/64位 (注册版) 下载
    Windows下的.NET+ Memcached安装
    在 ASP.NET 環境下使用 Memcached 快速上手指南
  • 原文地址:https://www.cnblogs.com/markytsai/p/13800743.html
Copyright © 2011-2022 走看看