zoukankan      html  css  js  c++  java
  • redis实现队列消息的ack

    由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。

    原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整。

    大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉;每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从list中POP出来。

    以下脚本使用lua实现,只需要在执行前加载到redis中即可。

    消息本身需要包含id属性

    push没什么问题,原生即可(此处以LPUSH为例)

    pop时脚本

     1 local not_empty = function(x)
     2     return (type(x) == "table") and (not x.err) and (#x ~= 0)
     3 end
     4 
     5 local qName = ARGV[1] --队列名称
     6 local currentTime = ARGV[2] --当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性
     7 local considerAsFailMaxTimeSpan = ARGV[3] --超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队
     8 
     9 local zsetName= qName ..'BACKUP'
    10 local hashName= qName ..'CONTEXT'
    11 
    12 local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1)
    13 if (not_empty(tmp)) then
    14     redis.call('ZREM', zsetName, tmp[1]) --此处拿出的为消息的唯一id
    15     redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1]))
    16 end
    17 tmp = redis.call('RPOP', qName)
    18 if (tmp) then
    19     local msg = cjson.decode(tmp)
    20     local id = msg['id']
    21     redis.call('ZADD', zsetName, tonumber(currentTime), id)
    22     redis.call('HSET',hashName , id, tmp)
    23 end
    24 return tmp

    ack时候比较简单,只需要将指定id从set和hash中删除即可

    1 local key = ARGV[1]
    2 local qName=ARGV[2]
    3 redis.call('ZREM', qName..'BACKUP', key)
    4 redis.call('HDEL', qName..'CONTEXT', key)

    在程序中使用前需要显示load这两个脚本,后面直接调用这两个脚本的sha值即可执行。

  • 相关阅读:
    js将url转换二维码
    百度地图api使用
    js字符串转日期兼容性
    Object.keys的使用
    Web App和Native App的比较
    数组转为对象
    常用meta整理
    git merge和git rebase的区别
    GitHub 翻译之 'Hello-world' 翻译
    js数据类型
  • 原文地址:https://www.cnblogs.com/ViewState/p/6662191.html
Copyright © 2011-2022 走看看