zoukankan      html  css  js  c++  java
  • 【延时任务】-基于redis的延时任务

    基于Redis的延时任务队列时间

    参考有赞延迟队列设计

    源码地址: https://gitee.com/A_yes/big-smart

    主要组成部分

    1. DelayJobBucket

    数据结构 redis的 zset(有序集合)用于存放任务的id 并按照执行时间排序

    2. JobPoll

    数据结构 redis的hash,以任务的id作为key,存放job的元信息

    3. ReadyQueue

    数据结构为redis的list,就绪队列,用于存放已经到期的任务,随时可以被取出去消费

    4. BucketTimer

    用于轮训对应delayJobBucket中到期的任务,并将任务从delayJobBucket中移除并加入对应ReadyQueue,一个线程对应一个bucket

    5. TopicTimer

    用于轮训ReadyQueue,并启动新线程去执行任务,一个线程对应一个readyQueue

    6. IWorker

    用于实现对不同任务不同的处理方式。

    Job数据结构

    • id : 任务的唯一标识,要保证同一个topic下唯一
    • topic : 主题,用来把任务分组
    • delay:延迟时间(单位毫秒)
    • ttr:超时时间,保证任务肯定被消费一次,当任务被从ReadyQueue中取出时,用ttr重新计算下次执行时间,并加入bucket,当任务被消费成功会删除jobPoll中的元数据
    • body:业务数据
    • status:任务状态
    • execTime:执行时间(绝对时间)
    • createTime:任务创建时间(绝对时间)

    使用方法

    内置里一个http处理worker,我们以它为例说明使用方法。

    这个worker处理逻辑是,当任务到期,像任务描述里的http接口发送一个请求,并比对返回值和给定的规则,判断是否通过。

    http任务业务数据结构:

    • url:到期需要调用的接口
    • method:get/post请求方法
    • headers:请求头
    • body:请求参数
    • passrule:成功规则

    1. 先添加任务

    • post /o/add

    参数示例:

    {
      "delay": 3000,
      "ttr": 10000,
      "topic": "http",
      "id": "2d56dc02-4aed-486w1-acea-bc1516b3959css",
      "body":"{"method":"get","headers":{"a":"asdad","b":123123},"passrule":{"code":1},"body":{"p1":"asd","p2":123123},"url":"http://192.168.200.250:7070/t/a"}"
    }
    

    对于topic为http的任务,只需要添加任务就行,到执行时间,会自动调用指定接口,其他类型的任务,可以通过实现IWorker接口在服务器端完成任务处理,或者通过http接口轮训在客户端完成任务处理,处理后要回报处理结果。

    2. 客户端轮询就绪任务

    • get /o/get

    参数示例:

    /o/get?topic=test
    

    3. 回报处理结果

    • get /o/complete

    参数示例:

     /o/complete?topic=test&id=123
    

    job流程

    1. 通过http添加job,job会被加入bucket,id作为值,执行时间作为score,job元信息会被加入jobPoll。

    2. 判断是否有此对应topic的BucketTimer线程启动,若无则启动一个开轮询对应的bucket,任务到期就将任务从bucket中取出,修改状态,根据ttr重新计算下次执行时间后重新入bucket(实现了超时重试机制),移入readyQueue

    3. 程序启动时会扫描redis中是否已有bucket,若有自动启动对应BucketTimer。程序启动时会根据配置文件,启动服务器端任务处理器。

    4. 服务端任务处理器轮询对应topic的readyQueue,当有任务时启动新线程去处理任务。

    5. 任务处理完成后会将元数据从jobPoll中删除

  • 相关阅读:
    LightOJ 1341 Aladdin and the Flying Carpet 数学
    NOIP2013 花匠 DP 线段树优化
    LightOJ 1370 Bi-shoe and Phi-shoe 欧拉函数+线段树
    BZOJ2818: Gcd 欧拉函数求前缀和
    SPOJ3267 D-query 离线+树状数组 在线主席树
    BZOJ 2588: Spoj 10628. Count on a tree 主席树+lca
    拓展欧几里得算法
    POJ1845Sumdiv(求所有因子和 + 唯一分解定理)
    UVA1635 Irrelevant Elements(唯一分解定理 + 组合数递推)
    codeforce 626E(二分)
  • 原文地址:https://www.cnblogs.com/A-yes/p/13501163.html
Copyright © 2011-2022 走看看