zoukankan      html  css  js  c++  java
  • 基于Redis实现延迟队列

    背景

    在后端服务中,经常有这样一种场景,写数据库操作在异步队列中执行,且这个异步队列是多进程运行的,这时如果对同一资源进行写库操作,很有可能产生数据被覆盖等问题,于是就需要业务层在更新数据库之前进行加锁,这样保证在更改同一资源时,没有其他更新操作干涉,保证数据一致性。

    但如果在更新前对数据库更新加锁,那此时又来了新的更新数据库的请求,但这个更新操作不能丢弃掉,需要延迟执行,那这就需要添加到延迟队列中,延迟执行。

    那么如何实现一个延迟队列?利用RedisSortedSetString这两种结构,就可以轻松实现。

     1 # coding: utf8  
     2 """Delay Queue"""  
     3 import json  
     4 import time  
     5 import uuid  
     6 import redis  
     7 class DelayQueue(object):  
     8     """延迟队列"""  
     9     QUEUE_KEY = 'delay_queue'  
    10     DATA_PREFIX = 'queue_data'  
    11     def __init__(self, conf):  
    12         host, port, db = conf['host'], conf['port'], conf['db']  
    13         self.client = redis.Redis(host=host, port=port, db=db)  
    14     def push(self, data):  
    15         """push 
    16         :param data: data 
    17         """  
    18         # 唯一ID  
    19         task_id = str(uuid.uuid4())  
    20         data_key = '{}_{}'.format(self.DATA_PREFIX, task_id)  
    21         # save string  
    22         self.client.set(data_key, json.dumps(data))  
    23         # add zset(queue_key=>data_key,ts)  
    24         self.client.zadd(self.QUEUE_KEY, data_key, int(time.time()))  
    25           
    26     def pop(self, num=5, previous=3):  
    27         """pop多条数据 
    28         :param num: pop多少个 
    29         :param previous: 获取多少秒前push的数据 
    30         """  
    31         # 取出previous秒之前push的数据  
    32         until_ts = int(time.time()) - previous  
    33         task_ids = self.client.zrangebyscore(  
    34             self.QUEUE_KEY, 0, until_ts, start=0, num=num)  
    35         if not task_ids:  
    36             return []  
    37         # 利用删除的原子性,防止并发获取重复数据  
    38         pipe = self.client.pipeline()  
    39         for task_id in task_ids:  
    40             pipe.zrem(self.QUEUE_KEY, task_id)  
    41         data_keys = [  
    42             data_key  
    43             for data_key, flag in zip(task_ids, pipe.execute())  
    44             if flag  
    45         ]  
    46         if not data_keys:  
    47             return []  
    48         # load data  
    49         data = [  
    50             json.loads(item)  
    51             for item in self.client.mget(data_keys)  
    52         ]  
    53         # delete string key  
    54         self.client.delete(*data_keys)  
    55         return data  

    实现思路

    push

    push数据时,执行如下几步:

    • 生成一个唯一key,这里使用uuid4生成(uuid4是根据随机数生成的,重复概率非常小,具体参考这里
    • 把数据序列化后存入这个唯一keyString结构中
    • 把这个唯一key加到SortedSet中,score是当前时间戳

    这里利用SortedSet记录添加数据的时间,便于在获取时根据时间获取之前的数据,达到延迟的效果。

    而真正的数据则存放在String结构中,等获取时先拿到数据的key再获取真正的数据。

    这里可能有人会疑问,为什么不把真正的数据放到SortedSetname中?

    • 把数据放入name中可能会产生瞬间写入相同数据导致数据多条变一条的情况
    • 把数据序列化放到SortedSetname中有些过大,不太符合使用习惯

    pop

    pop是可以获取多条数据的,上面的代码默认是获取延迟队列中3秒前的5条数据,具体思路如下:

    • 计算previous秒前的时间戳,使用SortedSetzrangebysocre方法获取previous秒之前添加的唯一key
    • 如果SortedSet中有数据,则利用Redis删除的原子性,使用zrem依次删除SortedSet的元素,如果删除成功,则使用,防止多进程并发执行此方法,拿到相同的数据
    • 那到可用的唯一key,从String中获取真正的数据即可

    这里最重要的是第二步,在拿出SortedSet的数据后,一定要防止其他进程并发获取到相同的数据,所以在这里使用zrem依次删除元素,保证只有删除成功的进程才能使用这条数据。

    使用

     1 # coding: utf8  
     2 import time  
     3 from delay import DelayQueue  
     4 redis_conf = {'host': '127.0.0.1', 'port': 6379, 'db': 0}  
     5 # 构造延迟队列对象  
     6 queue = DelayQueue(redis_conf)  
     7 # push 20条数据  
     8 for i in range(20):  
     9     item = {'user': 'user-{}'.format(i)}  
    10     queue.push(item)  
    11       
    12 # 从延迟队列中马上获取10条数据  
    13 data = queue.pop(num=10)  
    14 # 刚添加的马上获取是获取不到的  
    15 assert len(data) == 0  
    16 # 休眠10秒  
    17 time.sleep(10)  
    18 # 从延迟队列中获取10条数据  
    19 data = queue.pop(num=10)  
    20 assert len(data) == 10  
    21 # 从延迟队列中获取截止到5秒之前添加的10条数据  
    22 data = queue.pop(num=10, previous=5)  
    23 assert len(data) == 10  

    使用就比较简单了,在实际使用过程中,每次在处理正常队列时,通过上面的方法获取一下延迟队列的数据,如果延迟队列中有数据,那么按照业务正常处理就可以了,这样就达到了数据延迟处理的效果。

    转自:Kaito的博客

  • 相关阅读:
    GLASS产品下载
    Google earth engine批量下载MODIS数据并导出
    获取浏览器路径'?'开头,&开头的参数
    Js Vue全屏切换显示 指定div切换全屏显示
    关于Js 进一步应用遇到的小问题--总结
    自定义基于element UI 换行步骤条
    Vue 自定义封装树形表格组件
    js 解决在Vue中阻止重复提交、多次触发、节流函数、防止短时间多次触发问题
    Vue中使用 echarts 实现动态配置参数及类型显示不同形状的图表配置、可视化的echarts图表组件
    数据结构 -- 链表的整表创建
  • 原文地址:https://www.cnblogs.com/leijiangtao/p/4200314.html
Copyright © 2011-2022 走看看