zoukankan      html  css  js  c++  java
  • Redis学习笔记之延时队列

    一、业务场景

    所谓延时队列就是延时的消息队列,下面说一下一些业务场景比较好理解

    1.1 实践场景

    • 订单支付失败,每隔一段时间提醒用户
    • 用户并发量的情况,可以延时2分钟给用户发短信
    • ...

    1.2 实现方式

    这些情况都可以使用延时队列来做,实现延时队列比较场景的有使用消息队列MQ来实现,比如RocketMQ等等,也可以使用Redis来实现,本博客主要介绍一下Redis实现延时队列

    二、Redis延时队列

    2.1 Redis列表实现

    Redis实现延时队列可以通过其数据结构列表(list)来实现,顺便复习一下Redis的列表,实现列表,Redis可以通过队列和栈来实现:

    /* 队列:First in first out */
    
    //加两个value
    >rpush keynames key1 key2
    2
    
    //计算
    >llen keynames
    2
    
    >lpop keynames
    key1
    
    >lpop keynames
    key2
    
    //rpush会自动过期的
    >rpop keynames
    NULL
    
    /* 栈:First in last out */
    
    //同样,加两个元素
    >rpush keynames key1 key2
    2
    
    >rpop keynames
    key2
    
    >rpop keynames
    key1
    
    

    对于Redis的基本数据结构,可以参考我之前的博客:https://blog.csdn.net/u014427391/article/details/82860694

    然后怎么实现延时?Thread睡眠或者线程join?这种方法是可以实现,不过假如用户一多?10个请求就要延时10N了,这种情况系统性能不好的话就会出现线程阻塞了的情况。

    队列空了的情况?就会出现pop 的死循环,这种情况很可怕,很吃系统CPU,虽然可以通过线程睡眠方法来缓解,但不是最好的方法

    这时候就要介绍一下Redis的blpop/brpop来替换lpop/rpop,blpop/brpop阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零

    2.2 Redis集合实现

    Redis的有序集合(zset)也可以用于实现延时队列,消息作为value,时间作为score,这里顺便复习一下Redis的有序集合

    //9.0是score也就是权重
    >zadd keyname 9.0 math
    1
    
    >zadd keyname 9.2 history
    1
    
    //顺序
    >zrange keyname 0 -1
    1) history
    2) math
    
    //逆序
    >zrevrange keyname 0 -1
    1) math
    2) history
    
    //相当于count()
    >zcard keyname
    2
    
    获取指定key的score
    >zscore keyname math
    9
    
    
    

    然后多个线程的环境怎么保证任务不被多个线程抢了?这里可以使用Redis的zrem命令来实现

    Redis Zrem 命令用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

    当 key 存在但不是有序集类型时,返回一个错误。

    注意: 在 Redis 2.4 版本以前, ZREM 每次只能删除一个元素。

    下面给出来自《Redis 深度历险:核心原理与应用实践》小册的例子:例子就是用有序集合和zrem来实现的

    import java.lang.reflect.Type;
    import java.util.Set;
    import java.util.UUID;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    
    import redis.clients.jedis.Jedis;
    
    public class RedisDelayingQueue<T> {
    
      static class TaskItem<T> {
        public String id;
        public T msg;
      }
    
      // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
      private Type TaskType = new TypeReference<TaskItem<T>>() {
      }.getType();
    
      private Jedis jedis;
      private String queueKey;
    
      public RedisDelayingQueue(Jedis jedis, String queueKey) {
        this.jedis = jedis;
        this.queueKey = queueKey;
      }
    
      public void delay(T msg) {
        TaskItem<T> task = new TaskItem<T>();
        task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
        task.msg = msg;
        String s = JSON.toJSONString(task); // fastjson 序列化
        jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
      }
    
      public void loop() {
        while (!Thread.interrupted()) {
          // 只取一条
          Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
          if (values.isEmpty()) {
            try {
              Thread.sleep(500); // 歇会继续
            } catch (InterruptedException e) {
              break;
            }
            continue;
          }
          String s = values.iterator().next();
          if (jedis.zrem(queueKey, s) > 0) { // 抢到了
            TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
            this.handleMsg(task.msg);
          }
        }
      }
    
      public void handleMsg(T msg) {
        System.out.println(msg);
      }
    
      public static void main(String[] args) {
        Jedis jedis = new Jedis();
        RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
        Thread producer = new Thread() {
    
          public void run() {
            for (int i = 0; i < 10; i++) {
              queue.delay("codehole" + i);
            }
          }
    
        };
        Thread consumer = new Thread() {
    
          public void run() {
            queue.loop();
          }
    
        };
        producer.start();
        consumer.start();
        try {
          producer.join();
          Thread.sleep(6000);
          consumer.interrupt();
          consumer.join();
        } catch (InterruptedException e) {
        }
      }
    }
    

    不过在多线程环境,是很难做控制的,上面例子也有缺陷,下面引用小册的说法:

    上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

  • 相关阅读:
    AXI协议(一)
    System Verilog基础(一)
    AHB协议
    验证的概述
    简易APB4 slave实践
    APB协议
    指令跳转与预测
    HDU4405--Aeroplane chess(概率dp)
    BZOJ1419——Red is good(期望dp)
    BZOJ1426----收集邮票(期望dp)
  • 原文地址:https://www.cnblogs.com/mzq123/p/10426966.html
Copyright © 2011-2022 走看看