zoukankan      html  css  js  c++  java
  • Python 下redis 批量操作操作

    方法一:使用 pipeline

      使用pipelining 发送命令时,redis server必须部分请求放到队列中(使用内存)执行完毕后一次性发送结果,在 pipeline 使用期间,将“独占”链接,无法进行非“管道”类型的其他操作,直至 pipeline 关闭;如果 pipeline 的指令集很多很庞大,为了不影响其他操作(redis 最大时间lua-time-limit默认是5s),可以使用其他新建新链接操作。批量操作如下:

    import redis
    
    r = redis.Redis(host='127.0.0.1', port=6379, password='1234567890')
    with r.pipeline() as ctx:
        a = time.time()
        ctx.hset('current', "time2", a)
        ctx.hset('current', "time3", a)
        res = ctx.execute()
        print("result: ", res)

    使用 pipe line 以乐观锁的形式执行事务操作,详情:redis事务

    # -*- coding:utf-8 -*-
    
    import redis
    from redis import WatchError
    from concurrent.futures import ProcessPoolExecutor
    
    r = redis.Redis(host='127.0.0.1', port=6379)
    
    # 减库存函数, 循环直到减库存完成
    # 库存充足, 减库存成功, 返回True
    # 库存不足, 减库存失败, 返回False
    def decr_stock():
        # python中redis事务是通过pipeline的封装实现的
        with r.pipeline() as pipe:
            while True:
                try:
                    # watch库存键, multi后如果该key被其他客户端改变, 事务操作会抛出WatchError异常
                    pipe.watch('stock:count')
                    count = int(pipe.get('stock:count'))
                    if count > 0:  # 有库存
                        # 事务开始
                        pipe.multi()  # multi 判断 watch 监控的 key 是否被其他客户端改变
                        pipe.decr('stock:count')
                        # 把命令推送过去
                        # execute返回命令执行结果列表, 这里只有一个decr返回当前值
                        result = pipe.execute()[0]
                        print("result: ", result)
                        return True
                    else:
                        return False
                except WatchError as e:
                    # 打印WatchError异常, 观察被watch锁住的情况
                    print(e.args)
                finally:
                    pipe.unwatch()
    
    
    def worker():
        while True:
            # 没有库存就退出
            if not decr_stock():
                break
    
    
    # 实验开始
    # 设置库存为100
    r.set("stock:count", 100)
    
    # 多进程模拟多个客户端提交
    with ProcessPoolExecutor(max_workers=2) as pool:
        for _ in range(10):
            pool.submit(worker)

    方法二:使用 register_script 

    分布执行,发送脚本到redis服务器,获取一个本次连接的一个调用句柄,根据此句柄可以无数次执行不同参数调用

        import redis
        import time
    
        r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
        
        lua = """
        local key = KEYS[1]
        local field = ARGV[1]
        local timestamp_new = ARGV[2]
        
        -- get timestamp of the key in redis
        local timestamp_old = redis.call('hget', key, field)
        -- if timestamp_old == nil, it means the key is not exist
        if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
            redis.call('hset', key, field .. 1, timestamp_new)
            -- timestamp_new > timestamp_old
            return redis.pcall('hset', key, field, timestamp_new)
        end
        
        """
    
        cmd = r.register_script(lua)
    
        cur_time = time.time()
        cmd(keys=['current'], args=["time", cur_time])

    register_script 调用 lua 来实现,需要注意 redis.call(method, key, field) 的返回值(nil,false,1),此处没有键值返回的是false。如果中间有错误,所有的语句不时不生效。

    方法三:使用 script_load 和 evalsha

    简而言之,通过 script_load 发送给redis服务器,使加载 lua 脚本,并常驻内存,返回标志,通过 evalsha 按标志进行执行,此连接脱离本次redis 客户端。

        import redis
        import time
    
        r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
        
        lua = """
        local key = KEYS[1]
        local field = ARGV[1]
        local timestamp_new = ARGV[2]
        
        -- get timestamp of the key in redis
        local timestamp_old = redis.call('hget', key, field)
        -- if timestamp_old == nil, it means the key is not exist
        if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
            redis.call('hset', key, field .. 1, timestamp_new)
            -- timestamp_new > timestamp_old
            return redis.pcall('hset', key, field, timestamp_new)
        end
        
        """
        sha = r.script_load(lua)
        print(r.evalsha(sha, 1, 'current', 'time', time.time()))

    Redis 管理Lua脚本:(Python下为 script_... )

    • script load
      此命令用于将Lua脚本加载到Redis内存中
    • script exists
      scripts exists sha1 [sha1 …]  
      此命令用于判断sha1是否已经加载到Redis内存中
    • script flush 
      此命令用于清除Redis内存已经加载的所有Lua脚本,在执行script flush后,所有 sha 不复存在。
    • script kill 
      此命令用于杀掉正在执行的Lua脚本。

    方法四:eval

    使用方法与方法三类似,但是eval是一次性请求,每次请求,必须携带 lua 脚本

     
  • 相关阅读:
    微软外服 AlI In One
    js 循环多次和循环一次的时间的性能对比 All In One
    vue inject All In One
    Excel 表格数据倒置 All In One
    SVG tickets All In One
    OH MY ZSH All In One
    js array for loop performance compare All In One
    mac terminal show You have new mail All In one
    新闻视频 26 制作母版页
    转自牛腩 母版页和相对路径
  • 原文地址:https://www.cnblogs.com/spaceapp/p/12175975.html
Copyright © 2011-2022 走看看