zoukankan      html  css  js  c++  java
  • 用redis解决订单超发问题的4种方法

    用redis解决订单超发问题的4种方法

    # -*- coding: utf-8 -*-
    # 300~5000人抢100张票,保证不超发
    import redis
    import time
    import threading
    from redis import WatchError
    from redis_lock import synchronized
    
    REDIS_DATABASE = {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0
    }
    TICKET_NUM = 100  # 票数
    PEOPLE_NUM = 3000  # 人数
    
    rds = redis.Redis(host=REDIS_DATABASE['HOST'], port=REDIS_DATABASE['PORT'], db=REDIS_DATABASE['DB'])
    rds.delete('ticket_num')
    rst = rds.incr('ticket_num', amount=TICKET_NUM)
    
    rds.delete('tickets')
    values = ['' for _ in xrange(TICKET_NUM)]
    tickets = rds.lpush('tickets', *values)
    
    
    class TestRedis(threading.Thread):
        def __init__(self, t_num):
            self.t_num = t_num
            super(TestRedis, self).__init__()
    
        def run(self):
            # self.error_examples()  # 错误示范,多线程下会超发
            # self.optimistic_lock()  # 利用redis自带事务(乐观锁)
            # self.pessimistic_lock  # 自实现的悲观锁,比乐观锁快一丢丢
            # self.redis_list()  # 利用redis单线程特性,队列操作
            self.redis_incr()  # 推荐方法!利用redis单线程特性,计数器操作
    
        def error_examples(self):
            """
            错误示范,多线程下会超发
            :return:
            """
            ticket_num = int(rds.get('ticket_num'))
            time.sleep(0.1)  # 加上sleep效果更明显
            if ticket_num > 0:
                print('t_num=%s, ticket_num=%s' % (self.t_num, ticket_num))
                rds.set('ticket_num', ticket_num-1)
    
        def optimistic_lock(self):
            """
            乐观锁
            :return:
            """
            while 1:
                with rds.pipeline(transaction=True) as r_pip:
                    r_pip.watch('ticket_num')
                    try:
                        r_pip.multi()
                        ticket_num = int(rds.get('ticket_num'))
                        if ticket_num > 0:
                            r_pip.decr('ticket_num')
                        r_pip.execute()
                        return
                    except WatchError:
                        r_pip.unwatch()
    
        @synchronized(rds, "lock", 1000)
        def pessimistic_lock(self):
            """
            悲观锁
            :return:
            """
            ticket_num = int(rds.get('ticket_num'))
            if ticket_num > 0:
                rds.decr('ticket_num')
    
        def redis_list(self):
            """
            减列表方式,防止超发。利用redis单线程特性
            缺点:消耗内存
            :return:
            """
            ticket = rds.lpop('tickets')
            if ticket is not None:
                rds.decr('ticket_num')
    
        def redis_incr(self):
            """
            利用redis单线程特性。
            :return:
            """
            time.sleep(0.1)
            if int(rds.get('ticket_num')) > 0:
                de_num = rds.decr('ticket_num')  # 当只剩最后一张票时,多个线程都取到1,同时减后会成负数即“超发”
                if de_num < 0:  # “超发”后补回。不继续操作
                    print('Overshoot callback %s' % de_num)
                    rds.incr('ticket_num')
    
    
    tests = []
    for i in xrange(PEOPLE_NUM):
        t = TestRedis(i+1)
        tests.append(t)
    
    s = time.time()
    for t in tests:
        t.start()
    for t in tests:
        t.join()
    
    print('result ticket_num=%s, time=%s' % (rds.get('ticket_num'), (time.time()-s)*1000))

    redis_lock.py

    # coding: utf-8
    """
        redis 分布式悲观锁,需要解决以下几个问题
        1、A获取锁后崩溃,需要能将锁释放
        2、A获取锁后处理时间过长,导致锁过期,被B获取,A处理完后错误的将B锁释放
        
        redis.Redis()会有些问题,连接最好使用redis.StrictRedis()
    """
    
    import math
    import time
    import uuid
    from contextlib import contextmanager
    from functools import wraps
    
    from redis import WatchError
    
    
    def acquire_lock(conn, lock_name, acquire_timeout=1, lock_timeout=1):
        """
        获取锁
        :param conn: redis连接
        :param lock_name: 锁名称
        :param acquire_timeout: 获取锁最长等待时间,-1为永久阻塞等待
        :param lock_timeout: 锁超时时间
        :return: 
        """
    
        def should_acquire():
            if acquire_timeout == -1:
                return True
            acquire_end = time.time() + acquire_timeout
            return time.time() < acquire_end
    
        identity = str(uuid.uuid1())
        lock_timeout = int(math.ceil(lock_timeout))
        while should_acquire():
            if conn.set(lock_name, identity, ex=lock_timeout, nx=True):
                return identity
            else:
                pttl = conn.pttl(lock_name)
                # Redis or StrictRedis
                # 如果使用的是Redis , 可能会存在pttl为0 但是显示为None的情况
                if pttl is None or pttl == -1:
                    conn.expire(lock_name, lock_timeout)
            time.sleep(.1)
        return None
    
    
    def release_lock(conn, lock_name, identity):
        pipe = conn.pipeline(True)
        while True:
            try:
                pipe.watch(lock_name)
                if pipe.get(lock_name) == identity:
                    pipe.delete(lock_name)
                    return True
                pipe.unwatch()
                break
            except WatchError:
                pass
        return False
    
    
    @contextmanager
    def lock(conn, lock_name, lock_timeout):
        """
        with lock(conn, "lock", 10):
            do something
        """
        id_ = None
        try:
            id_ = acquire_lock(conn, lock_name, -1, lock_timeout)
            yield id_
        finally:
            release_lock(conn, lock_name, id_)
    
    
    def synchronized(conn, lock_name, lock_timeout):
        """
        @synchronized(conn, "lock", 10)
        def fun():
            counter = int(r.get("counter"))
            counter += 1
            r.set("counter", counter)
        """
    
        def decorator(func):
            @wraps(func)
            def wrap(*args, **kwargs):
                with lock(conn, lock_name, lock_timeout):
                    return func(*args, **kwargs)
    
            return wrap
    
        return decorator
    
    
    if __name__ == '__main__':
        import redis
    
        r = redis.Redis("localhost", db=5)
    
        id_ = acquire_lock(r, "lock", acquire_timeout=1, lock_timeout=10)
        release_lock(r, "lock", id_)
        with lock(r, "lock", 1):
            print("do something")
    
    
        @synchronized(r, "lock", 10)
        def fun():
            counter = int(r.get("counter"))
            counter += 1
            r.set("counter", counter)
    
    
        for i in range(10000):
            fun()
  • 相关阅读:
    图片垂直居中的方法(适合只有一行文字和图片)
    微信小程序 this.setData() 详解
    phpstorm 2017激活码(方法)
    WebSocket 协议 详解
    容器装不下内容时,显示滚动条
    Restore IP Addresses
    Reverse Linked List II
    Subsets II
    Decode Ways
    Gray Code
  • 原文地址:https://www.cnblogs.com/aaron-agu/p/12269623.html
Copyright © 2011-2022 走看看