zoukankan      html  css  js  c++  java
  • 实现简单延迟队列和分布式延迟队列

      在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。

      1. 实现一个简单的延迟队列。

      2.使用Redis的list实现分布式延迟队列。

      3.使用Redis的zSet实现分布式延迟队列。

      4. 总结一下,另外还有哪些可以延迟队列。

    1.  实现一个简单的延迟队列。

      我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图。DelayQueue实现了Delay、BlockingQueue接口。也就是DelayQueue是一种阻塞队列。

      

      我们在看一下Delay的类图。Delayed接口也实现了Comparable接口,也就是我们使用Delayed的时候需要实现CompareTo方法。因为队列中的数据需要排一下先后,根据我们自己的实现。Delayed接口里边有一个方法就是getDelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。

      我们创建一个Message类,实现了Delayed接口,我们主要把getDelay和compareTo进行实现。在Message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间fireTime。同时按照延迟时间的升序进行排序。我重写了里边的toString方法,用于将Message按照我写的方法进行输出。

    package com.hqs.delayQueue.bean;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author huangqingshi
     * @Date 2020-04-18
     */
    public class Message implements Delayed {
    
        private String body;
        private long fireTime;
    
        public String getBody() {
            return body;
        }
    
        public long getFireTime() {
            return fireTime;
        }
    
        public Message(String body, long delayTime) {
            this.body = body;
            this.fireTime = delayTime + System.currentTimeMillis();
        }
    
        public long getDelay(TimeUnit unit) {
    
            return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
        }
    
        @Override
        public String toString() {
            return System.currentTimeMillis() + ":" + body;
        }
    
        public static void main(String[] args) throws InterruptedException {
            System.out.println(System.currentTimeMillis() + ":start");
            BlockingQueue<Message> queue = new DelayQueue<>();
            Message message1 = new Message("hello", 1000 * 5L);
            Message message2 = new Message("world", 1000 * 7L);
    
            queue.put(message1);
            queue.put(message2);
    
            while (queue.size() > 0) {
                System.out.println(queue.take());
            }
        }
    }

      里边的main方法里边声明了两个Message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。

    1587218430786:start
    1587218435789:hello
    1587218437793:world

      这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用Redis实现分布式的任务处理。

      2. 使用Redis的list实现分布式延迟队列。

      本地需要安装一个Redis,我自己是使用Docker构建一个Redis,非常快速,命令也没多少。我们直接启动Redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。

    docker pull redis
    docker run -itd --name redisLocal -p 6379:6379 redis
    docker exec -it redisLocal /bin/bash
    redis-cli

      我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.6.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.hqs</groupId>
        <artifactId>delayQueue</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>delayQueue</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

      加上Redis的配置放到application.properties里边即可实现Redis连接,非常的方便。

    # redis
    redis.host=127.0.0.1
    redis.port=6379
    redis.password=
    redis.maxIdle=100
    redis.maxTotal=300
    redis.maxWait=10000
    redis.testOnBorrow=true
    redis.timeout=100000

      接下来实现一个基于Redis的list数据类型进行实现的一个类。我们使用RedisTemplate操作Redis,这个里边封装好我们所需要的Redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10W个,也是避免数据量过大对Redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算Redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到Redis里边。

    package com.hqs.delayQueue.cache;
    
    import com.hqs.delayQueue.bean.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import java.util.concurrent.BlockingQueue;
    
    /**
     * @author huangqingshi
     * @Date 2020-04-18
     */
    @Slf4j
    public class RedisListDelayedQueue{
    
        private static final int MAX_SIZE_OF_QUEUE = 100000;
        private RedisTemplate<String, String> redisTemplate;
        private String queueName;
        private BlockingQueue<Message> delayedQueue;
    
        public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
            this.redisTemplate = redisTemplate;
            this.queueName = queueName;
            this.delayedQueue = delayedQueue;
            init();
        }
    
        public void offerMessage(Message message) {
            if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
                throw new IllegalStateException("超过队列要求最大值,请检查");
            }
            try {
                log.info("offerMessage:" + message);
                delayedQueue.offer(message);
            } catch (Exception e) {
                log.error("offMessage异常", e);
            }
        }
    
        public void init() {
            new Thread(() -> {
                while(true) {
                    try {
                        Message message = delayedQueue.take();
                        redisTemplate.opsForList().leftPush(queueName, message.toString());
                    } catch (InterruptedException e) {
                        log.error("取消息错误", e);
                    }
                }
            }).start();
        }
    }

      接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listDelayedQueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从Redis的list中获取。方法也非常简单。

    package com.hqs.delayQueue.controller;
    
    import com.hqs.delayQueue.bean.Message;
    import com.hqs.delayQueue.cache.RedisListDelayedQueue;
    import com.hqs.delayQueue.cache.RedisZSetDelayedQueue;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import java.util.Set;
    import java.util.concurrent.*;
    
    /**
     * @author huangqingshi
     * @Date 2020-04-18
     */
    @Slf4j
    @Controller
    public class DelayQueueController {
    
        private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors();
    
        //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的
        @Autowired
        RedisTemplate<String, String> redisTemplate;
    
        private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>());
    
        @GetMapping("/redisTest")
        @ResponseBody
        public String redisTest() {
            redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS);
            System.out.println(redisTemplate.opsForValue().get("a"));
            return "s";
        }
    
        @GetMapping("/redis/listDelayedQueue")
        @ResponseBody
        public String listDelayedQueue() {
    
            Message message1 = new Message("hello", 1000 * 5L);
            Message message2 = new Message("world", 1000 * 7L);
    
            String queueName = "list_queue";
    
            BlockingQueue<Message> delayedQueue = new DelayQueue<>();
    
            RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue);
    
            redisListDelayedQueue.offerMessage(message1);
            redisListDelayedQueue.offerMessage(message2);
            asyncListTask(queueName);
    
            return "success";
        }
    
        @GetMapping("/redis/zSetDelayedQueue")
        @ResponseBody
        public String zSetDelayedQueue() {
    
            Message message1 = new Message("hello", 1000 * 5L);
            Message message2 = new Message("world", 1000 * 7L);
    
            String queueName = "zset_queue";
    
            BlockingQueue<Message> delayedQueue = new DelayQueue<>();
    
            RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue);
    
            redisZSetDelayedQueue.offerMessage(message1);
            redisZSetDelayedQueue.offerMessage(message2);
            asyncZSetTask(queueName);
    
            return "success";
        }
    
        public void asyncListTask(String queueName) {
            taskExecPool.execute(() -> {
                for(;;) {
                    String message = redisTemplate.opsForList().rightPop(queueName);
                    if(message != null) {
                        log.info(message);
                    }
                }
            });
        }
    
        public void asyncZSetTask(String queueName) {
            taskExecPool.execute(() -> {
                for(;;) {
                    Long nowTimeInMs = System.currentTimeMillis();
                    System.out.println("nowTimeInMs:" + nowTimeInMs);
                    Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs);
                    if(messages != null && messages.size() != 0) {
                        redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs);
                        for (String message : messages) {
                            log.info("asyncZSetTask:" + message + " " + nowTimeInMs);
                        }
                        log.info(redisTemplate.opsForZSet().zCard(queueName).toString());
                    }
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    }

      我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到Redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好的进行这个问题的处理。

      3. 使用Redis的zSet实现分布式延迟队列。

      我们需要再写一个ZSet的队列处理。下边的offerMessage主要是把消息直接放入缓存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。

    package com.hqs.delayQueue.cache;
    
    import com.hqs.delayQueue.bean.Message;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.core.RedisTemplate;
    
    import java.util.concurrent.BlockingQueue;
    
    /**
     * @author huangqingshi
     * @Date 2020-04-18
     */
    @Slf4j
    public class RedisZSetDelayedQueue {
    
        private static final int MAX_SIZE_OF_QUEUE = 100000;
        private RedisTemplate<String, String> redisTemplate;
        private String queueName;
        private BlockingQueue<Message> delayedQueue;
    
        public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) {
            this.redisTemplate = redisTemplate;
            this.queueName = queueName;
            this.delayedQueue = delayedQueue;
        }
    
        public void offerMessage(Message message) {
            if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) {
                throw new IllegalStateException("超过队列要求最大值,请检查");
            }
            long delayTime = message.getFireTime() - System.currentTimeMillis();
            log.info("zset offerMessage" + message + delayTime);
            redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime());
        }
    
    }

      上边的Controller方法已经写好了测试的方法。/redis/zSetDelayedQueue,里边主要使用ZSet的zRangeByScore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removeRangeByScore,将数据删除。这样数据可以直接写到Redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖Redis,如果Redis内存不足或者连不上的时候,系统将变得不可用。

      4. 总结一下,另外还有哪些可以延迟队列。

      上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。

      其实还有很多东西可以实现延迟队列。

      1) RabbitMQ就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。

      2)Kafka也可以实现这个功能。

      3)Netty的HashedWheelTimer也可以实现这个功能。

      

      有兴趣的同学可以进一步研究这些内容的实现。

      最后放上我的代码: https://github.com/stonehqs/delayQueue  

       

  • 相关阅读:
    怎样让jquery mobile 的footer/header 固定?
    smarty-smarty模板中类似for循环功能的实现代码
    mysql-MySql Update与case when
    PHP-获取用户所有定义的常量
    PHP-去掉php中var_dump()函数输出的省略号,让它完整显示0.0
    PHP--判断是否为时间戳
    Chrome/360极速/猎豹/枫树/浏览器去除视频网站广告利器插件——【切糕】广告视频屏蔽专家下载
    bootstrap使用modal动态对话框时,按回车键无法确认,反而取消对话框
    【bootstrap】常用bootstrap类库插件
    Thinkphp-开发技巧
  • 原文地址:https://www.cnblogs.com/huangqingshi/p/12728831.html
Copyright © 2011-2022 走看看