zoukankan      html  css  js  c++  java
  • redis实现简单延时队列(转)

       继之前用rabbitMQ实现延时队列,Redis由于其自身的Zset数据结构,也同样可以实现延时的操作

        Zset本质就是Set结构上加了个排序的功能,除了添加数据value之外,还提供另一属性score,这一属性在添加修改元素时候可以指定,每次指定后,Zset会自动重新按新的值调整顺序。可以理解为有两列字段的数据表,一列存value,一列存顺序编号。操作中key理解为zset的名字,那么对延时队列又有何用呢?试想如果score代表的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间前后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,如果当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就可以达到延时执行的目的, 注意不需要遍历整个Zset集合,以免造成性能浪费。

        Zset的排列效果如下图:

    java代码实现如下:

    package cn.chinotan.service.delayQueueRedis;
    
    import org.apache.commons.lang3.StringUtils;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Tuple;
    
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.Set;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: test
     * @description: redis实现延时队列
     * @author: xingcheng
     * @create: 2018-08-19
     **/
    public class AppTest {
    
        private static final String ADDR = "127.0.0.1";
        private static final int PORT = 6379;
        private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
        private static CountDownLatch cdl = new CountDownLatch(10);
    
        public static Jedis getJedis() {
            return jedisPool.getResource();
        }
    
        /**
         * 生产者,生成5个订单
         */
        public void productionDelayMessage() {
            for (int i = 0; i < 5; i++) {
                Calendar instance = Calendar.getInstance();
                // 3秒后执行
                instance.add(Calendar.SECOND, 3 + i);
                AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
                System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                System.out.println((3 + i) + "秒后执行");
            }
        }
    
        //消费者,取订单
        public static void consumerDelayMessage() {
            Jedis jedis = AppTest.getJedis();
            while (true) {
                Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
                if (order == null || order.isEmpty()) {
                    System.out.println("当前没有等待的任务");
                    try {
                        TimeUnit.MICROSECONDS.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    continue;
                }
                Tuple tuple = (Tuple) order.toArray()[0];
                double score = tuple.getScore();
                Calendar instance = Calendar.getInstance();
                long nowTime = instance.getTimeInMillis() / 1000;
                if (nowTime >= score) {
                    String element = tuple.getElement();
                    Long orderId = jedis.zrem("orderId", element);
                    if (orderId > 0) {
                        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element);
                    }
                }
            }
        }
    
        static class DelayMessage implements Runnable{
            @Override
            public void run() {
                try {
                    cdl.await();
                    consumerDelayMessage();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
        public static void main(String[] args) {
            AppTest appTest = new AppTest();
            appTest.productionDelayMessage();
            for (int i = 0; i < 10; i++) {
                new Thread(new DelayMessage()).start();
                cdl.countDown();
            }
        }
    }

    实现效果如下:

    生产环境使用注意:

    由于这种实现方式简单,但在生产环境下大多是多实例部署,所以存在并发问题,即缓存的查找和删除不具有原子性(zrangeWithScores和zrem操作不是一个命令,不具有原子性),会导致消息的多次发送问题,这个问题的避免方法如下:

    1.可以采用单独一个实例部署解决(不具备高可用特性,容易单机出现故障后消息不能及时发送)

    2.采用redis的lua脚本进行原子操作,即原子操作查找和删除(实现难度大)

    因此,延时队列的实现最好采用rabbitMQ来实现,rabbitMQ天然具备分布式的特性,可以很好的用在多服务,多实例环境下,具体的实现参考https://my.oschina.net/u/3266761/blog/1926588

    转载地址:https://my.oschina.net/u/3266761/blog/1930360

  • 相关阅读:
    【SAS NOTE】OUTPUT
    【SAS NOTES】_NULL_
    【SAS NOTE】sas 9.2 安装
    【SAS NOTE】FREQ
    纯数学教程 Page 203 例XLI (1)
    纯数学教程 Page 203 例XLI (3)
    纯数学教程 Page 203 例XLI (2)
    Prove Cauchy's inequality by induction
    纯数学教程 Page 325 例LXVIII (15) 调和级数发散
    纯数学教程 Page 325 例LXVIII (15) 调和级数发散
  • 原文地址:https://www.cnblogs.com/yinliang/p/10731147.html
Copyright © 2011-2022 走看看