zoukankan      html  css  js  c++  java
  • 延时队列

    一、什么是延时队列

      队列是存储消息的载体,延时队列存储的对象是延时消息。

      所谓的延时消息,是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费。

      和定时任务的区别:

        1)定时任务有明确的触发时间,延时任务没有

        2)定时任务有执行周期,而延时任务在某时间触发一段时间内执行,没有执行周期

        3)定时任务一般执行的批处理操作是多个任务,而延时任务一般是单个任务

    二、延时队列使用场景

      1)在订单系统中,订单如果30分钟之内没有支付成功,那么这个订单将被关闭;生成订单60s之后给用户发通知短信;这时就可以使用延时队列来处理这些订单。

      2)订单完成1小时后通知用户进行评价

      3)用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时就可以将用户指令发送到延时队列,当指令的时间到了之后再将它推送到智能设备

    三、延时队列的实现

     

     1、Redis key过期通知

      https://www.cnblogs.com/yangyongjie/p/14399707.html

     2、Redis zset

      Redis的zset(有序集合)也能实现延时队列。主要利用它的score属性,redis通过score来为集合中的成员进行从小到大的排序

      我们将消息序列化成一个字符串作为zset的value,这个消息的到期处理时间作为score,然后用多个线程轮询zset获取到期的任务进行处理(多个线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理。同时需要考虑多个线程的并发问题,确保任务不会被多次执行)。

      步骤:

      1)将延时消息的score 设置为到期的时间戳,消息内容序列化为value,调用zdd命令将此条延迟消息保存在key为delayqueue 的zset中

      2)另起线程,循环从 delayqueue 中获取score小于等于当前时间戳的消息元素(zrangebyscore命令)

      3)zrem删除获取到的元素

      

     3、Kafka实现延时队列

      原生的Kafka并不具备延时队列的功能,不过可以改造来实现延时队列(不建议使用Kafka来实现的延时队列)。

      方案1:(延时精度较低)

      1)在发送延时消息时,先将消息投递到延时队列(delay_topic)中(headers中设置延时时间,timestamp存消息发送初始发送时间戳)

      2)定义一个服务去消费延时队列中的消息,将满足条件的消息再投递到目标队列(target_topic)中。

        延时一般以秒来计,若要支持2小时(2*60*60=7200)之内的延时时间的消息,显然不能按照延时时间来创建7200个 delay_topic,一般是按照延时等级来划分 delay_topic,如设定5s,10s,30s,1min,5min,30min,1h,2h这些递增的延时等级,

      延时消息只支持这些等级内的延时,然后延时的消息按照延时时间投递到不同等级的topic中(RocketMQ的延时实现与此类似)

      时间轮

        Kafka的延时操作,并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer),基于时间轮的插入和删除操作的事件复杂度都是O(1)。

       时间轮结构:

        Kafka中的时间轮(TimingWheel) 是一个存储定时任务的环形队列,底层采用数组实现,数组内的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

        时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格的个数(wheelSize)是固定的,整个时间轮的时间跨=跨度(interval=wheelSize*tickMS)。

        时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,表盘指针可以将整个时间轮划分为到期部分和未到期部分,表盘指针当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。

        

        若时间轮的 tickMs 为 1ms且 wheelSize 等于20,那么可以计算得出总体时间跨度 interval 为 20ms。

        初始情况下表盘指针 currentTime 指向时间格0,此时有一个延时为2ms的任务插进来会存放到时间格为2的TimerTaskList中。随着时间不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimerTaskList中的任务进行相应的到期操作。此时若又有一个延时为8ms的任务插进来,则会存放到时间格为10中。

        若延时的时长大于时间轮的总体时间跨度20ms,那该怎么办,不能无限扩容wheelSize的大小,kafka为此引入了时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

      Netty的HashedWheelTimer时间轮实现:

       

  • 相关阅读:
    部署方法
    MIME
    设置下载文件位置
    访问端口
    什么是Linq表达式?什么是Lambda表达式?
    彻底理解js中this的指向,不必硬背。
    获取checkbox状态
    checkbox复选框,如何让其勾选时触发一个事件,取消勾选时不触发
    leetcode — reverse-nodes-in-k-group
    leetcode — swap-nodes-in-pairs
  • 原文地址:https://www.cnblogs.com/yangyongjie/p/14403098.html
Copyright © 2011-2022 走看看