zoukankan      html  css  js  c++  java
  • 延迟任务调度系统—技术选型与设计(上篇)

    本文来自网易云社区

      • 延迟任务的场景是?
      • 现有的解决方案是?
      • 存在的问题是什么?
      • 希望达到的目标是?
      • 可以实现的方案有?
        • RabbitMQ实现
          • 通过死信和死信路由实现
          • 通过延迟消息插件来实现
        • Redis实现
        • DelayQueue实现
        • 时间轮实现
          • 单表时间轮
          • 分层时间轮
        • 之前的设计(DB/DelayQueue/ZooKeeper)
        • 另一种方案(DB/DelayQueue/ZooKeeper/MQ

    延迟任务的场景是?

    • 习题考试截止前3天,给未提交用户发送消
    • 学习项目开课前2小时,给参与用户发送通知
    • 问卷开始收集时,才对用户可见
    • 问卷结束收集时,触发一些操作
    • 指定时间发布课件
    • 课程结束时,开始计算用户结业信息
    • 直播时间到了,给用户发送消息
    • 用户下单后,30分钟内未付款,关闭订单
    • 用户付款后,24小时内未发货,提示发货
    • 用户打车后,48小时后自动评价为5星
    • 这类业务的特点是:延迟执行。一种比较简单的方法是使用后台线程扫描符合条件的业务数据,逐一处理。 这种方法扫描间隔时间不好设置,间隔时间过大影响精确度,过小则影响效率和性能。

    现有的解决方案是?

    • 通过linux的crontab触发定时任务
    • 扫描业务表,筛选出符合条件的数据对其进行操作

    存在的问题是什么?

    • 由于每种类型的任务都设有扫描间隔,任务不能精确处理
    • 扫描业务库,影响业务正常操作
    • 任务的执行过于密集,容易导致服务器间隔性压力
    • 存在系统单点,触发定时调度的服务挂了,所有任务都不会执行
    • 系统不具容错能力,一旦错过了,任务就不会再被执行
    • 没有统一的视图来查看任务的执行情况
    • 没有告警来提示失败的任务

    希望达到的目标是?

    • 精确性(可在指定时间触发任务处理)
    • 通用性
    • 高性能(集群能力不少于1000TPS)
    • 高可用(支持多实例部署)
    • 可伸缩(增加和减少服务时,任务会重新分配)
    • 可重试(任务失败可重试)
    • 多协议(支持httpdubbo调用)
    • 可管理(业务使用方可修改、删除任务)
    • 能告警(失败次数达到阈值可触发告警)
    • 统一视图(方便查看任务执行情况,可手动干预任务执行)

    下面所讨论技术方案的前提是精确触发,所以我们不讨论目前业界的一些分布式调度系统如:elastic-job,xxl-job,tbschedule等, 这些系统解决不了延迟任务精确触发问题。

    可以实现的方案有?

    RabbitMQ实现

    通过死信和死信路由实现

    原理如下:

    何为死信:

    • 消息被拒绝
    • 消息已过期
    • 队列达到最大长度

    RabbitMQ可以对队列和消息设置x-message-tt、expiration来控制消息的存活时间,如果超时,消息变为死信。

    何为死信路由:

    RabbitMQ可以对队列设置x-dead-letter-exchange和x-dead-letter-routing-key两个参数。
    当消息在一个队列中变成死信后会按这两个参数路由,消息就可以重新被消费。

    实例操作:

    1. 创建延迟队列(设置死信路由)
    2. 创建就绪队列
    3. 创建死信路由
    4. 绑定死信路由与就绪队列
    5. 发送延迟消息
    6. 消息过期后进入就绪队列

    优点:

    • 高效,可以利用RabbitMQ的分布式特性轻易进行横向扩展,且支持持久化

    缺点:

    • 不支持对已发送的消息进行管理
    • 一个消息比在同一队列中的其他消息提前过期,提前过期的消息也不会优先进入死信队列。

    所以需要确保业务上每个任务的延迟时间是一致的。如果有不同延时的任务,需要为每种不同延迟的任务单独创建消息队列,缺乏灵活性。

    通过延迟消息插件来实现

    原理如下:

    核心代码流程:

    其原理是延迟消息会被保存到Mnesia表,在Exchange中根据每个message头设置的延迟时间x-delay,消息过期后才路由到对应队列。

    实例操作:

    1. 下载插件
    2. 安装插件
    docker-compose.xml(将插件安装到容器中) version: '2' services:   rabbitmq:     hostname: rabbitmq     image: rabbitmq:3.6.8-management     mem_limit: 200m     ports:       - "5672:5672"       - "15672:15672"     volumes:       - ~/dockermapping/rabbitmq/lib:/var/lib/rabbitmq/
           - /Users/oldlu/workspace/document/docker-compose/rabbitmq/rabbitmq_delayed_message_exchange-0.0.1.ez:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.8/plugins/rabbitmq_delayed_message_exchange-0.0.1.ez
    
     启用插件
     rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • 创建类型为x-delayed-message的路由
    • 创建就绪队列
    • 绑定队列和路由
    • 发布延迟消息(设置x-delay=延迟的毫秒数)

    源码分析

    核心函数
    消息入队:internal_delay_message
    启动Timer:maybe_delay_first
    消息处理:handle_info
    

    优点:

    • 一个消息比其他消息提前过期,提前过期的消息会被提前路由到队列,不需要为不同延迟的消息创建单独的消息队列。

    缺点:

    • 不支持对已发送的消息进行管理
    • 集群中只有一个数据副本(保存在当前节点下的Mnesia表中),如果节点不可用或关闭插件会丢失消息。
    • 目前该插件只支持disk节点,不支持RAM节点
    • 性能比原生差一点(普通的Exchange收到消息后直接路由到队列,而延迟队列需要判断消息是否过期,未过期的需要保存在表中,时间到了再捞出来路由)

    Redis实现

    有序集合(Sorted Set)是Redis提供的一种数据结构,具有set和hash的特点。
    其中每个元素都关联一个score,并以这个score来排序。
    其内部实现用到了两个数据结构:hash table和 skip list(跳跃表)

    skip list的特点

    • 由很多层结构组成,level是通过一定的概率随机产生的
    • 每一层都是一个有序的链表,默认是升序
    • 最底层的链表包含所有元素
    • 如果一个元素出现在Level i的链表中,则它在Level i之下的链表也都会出现
    • 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素
    • 插入和删除的时间复杂度是O(logn),当达到了一定的数据规模之后,它的效率与红黑树差不多

    主要命令

    • zadd:向Sorted Set中添加元素
    • zrem:删除Sorted Set中的指定元素
    • zrange:按照从小到大的顺序返回指定区间内的元素

    实现延迟队列

    1. 将延迟任务加到Sorted Set,将延迟时间设为score
    2. 启动一个线程不断判断Sorted Set中第一个元素的score是否大于当前时间
    3. 如果大于,从Sorted Set中移除任务并添加到执行队列中
    4. 如果小于,进行短暂休眠后重试

    实例操作

    root@redis:/usr/local/bin# redis-cli127.0.0.1:6379> zadd delayqueue 1 task1
    (integer) 1127.0.0.1:6379> zadd delayqueue 2 task2
    (integer) 1127.0.0.1:6379> zadd delayqueue 4 task4
    (integer) 1127.0.0.1:6379> zadd delayqueue 3 task3
    (integer) 1127.0.0.1:6379>127.0.0.1:6379> zrange delayqueue 0 0 withscores1) "task1"
    

    优点:

    • 实现简单
    • 任务可管理(可删除、修改任务)

    缺点:

    • 需要有短轮询线程不断判断第一个元素是否过期,造成CPU空耗
    • 分布式场景中,容易引起多个节点读取到相同任务

    DelayQueue实现

    DelayQueue是一个使用优先队列实现的BlockingQueue,优先队列比较的是时间,内部存储的是实现Delayed接口的对象。 只有在对象过期后才能从队列中获取对象。

    内部结构

    • 可重入锁
    • 用于根据delay时间排序的优先级队列
    • 用于优化阻塞通知的线程leader
    • 用于实现阻塞和通知的Condition对象

    Leader/Followers
    Leader/Followers是多个工作线程轮流进行事件监听、分发、处理的一种模式。 该模式最大的优点在于,它是自己监听事件并处理客户请求,从接收到处理都是在同一线程中完成, 所以不需要在线程之间传递数据,解决线程频繁切换带来的开销。

    该模式工作的任何时间点,只有一个线程成为Leader ,负责事件监听,而其他线程都是Follower,在休眠中等待成为Leader。 该模式的工作线程存在三种状态,工作线程同一时间只能处于一种状态,这三种状态为

    • Leading:线程处于领导者状态,负责事件监听。Leader监听到事件后,有两种处理方式:
      • 可以转移至Processing状态,自己处理该事件,并调用方法推选新领导者。
      • 也可以指定其他Follower来处理事件,此时Leader状态不变。
    • Processing:线程正在处理事件,处理完事件如果当前线程集中没有领导者,它将成为新领导者,否则转为追随者。
    • Following:线程处于追随者状态,等待成为新的领导者也可能被领导者指定来处理新的事件。

    核心源码分析:

    入队public boolean offer(E e) {
     final ReentrantLock lock = this.lock;
      lock.lock();
     try {
          q.offer(e);
     if (q.peek() == e) {//入队对象延迟时间是队列中最短的          leader = null;//重置leader          available.signal();//唤醒一个线程去监听新加入的对象      }
     return true;
      } finally {
          lock.unlock();
      }
    }
    
    出队public E take() throws InterruptedException {
     final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
     try {
     for (;;) {
              E first = q.peek();
     if (first == null)
                  available.await();//队列为空,无限等待 else {
     long delay = first.getDelay(TimeUnit.NANOSECONDS);
     if (delay <= 0)//延迟时间已过,直接返回 return q.poll();
     else if (leader != null)//已有leader在监听了,无限等待                  available.await();
     else {
                      Thread thisThread = Thread.currentThread();
                      leader = thisThread;//当前线程成为leader try {
                          available.awaitNanos(delay);//在delay纳秒后唤醒                  } finally {
     if (leader == thisThread)// 入队一个最小延迟时间的对象时leader会被清空                          leader = null;
                      }
                  }
              }
          }
      } finally {
     if (leader == null && q.peek() != null)//leader不存在且队列不为空,唤醒一个follower去成为leader去监听          available.signal();
          lock.unlock();
      }
    }
     
    

    优点:

    • 效率高,任务触发时间延迟低

    缺点:

    • 数据是保存在内存,需要自己实现持久化
    • 不具备分布式能力,需要自己实现高可用

    未完待续。

    本文来自网易云社区,经作者陈志良授权发布。

    原文:延迟任务调度系统(技术选型与设计)

  • 相关阅读:
    给元素查索引
    数组的反转和排序
    本地修改文件到git
    +new Date()
    文字双层投影
    表格组件 自定义宽度 设置 span 内容 超出省略号
    iframe的使用
    分页获取列表进行索引累加
    日期时间格式化
    柱状echarts 自定义图例设置
  • 原文地址:https://www.cnblogs.com/163yun/p/9485363.html
Copyright © 2011-2022 走看看