zoukankan      html  css  js  c++  java
  • 以步步为营的风格解读 Redis分布式锁

    之前码甲哥写了两篇有关线程安全的文章:

    分布式锁是"线程同步"的延续

    最近首度应用"分布式锁",现在想想,分布式锁不是孤立的技能点,这其实就是跨主机的线程同步

    进程内 跨进程 跨主机
    Lock/Monitor、SemaphoreSlim Metux、Semaphore 分布式锁
    用户态线程安全 (命名对象是)内核态线程安全

    单机服务器可以通过共享某堆内存来标记上锁/解锁,线程同步说到底是建立在单机操作系统的用户态/内核态对共享内存的访问控制。

    而分布式服务器不是在同一台机器上:跨主机,因此需要将内存标记存储在所有机器进程都能看到的地方 。。。

    在开发很多业务场景会使用到锁,例如库存控制,抽奖等。
    例如库存只剩1个商品,有三个用户同时打算购买,谁先购买库存立即清零,不能让其他二人也购买成功。

    解读分布式锁

    我们常说的线程安全、线程同步方案,包括此次的分布式锁都是基于“多线程/多进程对特定资源有更新操作”。

    基本考量:

    1. 分布式系统,一个锁在同一时间只能被一个服务器获取 (这是分布式锁的基础)
    2. 具备锁失效机制,防止死锁 (防止某些意外,锁没有得到释放,那别人也无法得到锁)

    Redis SET resource-name anystring NX EX max-lock-time 是一种最简单的分布式锁实现方案。

    SET 命令支持多个参数:

    • EX seconds-- 设置过期时间(s)
    • NX -- 如果key不存在,则设置
      ......
      因为SET命令参数可以替代SETNX,SETEX,GETSET,这些命令在未来可能被废弃。

    上面的命令返回OK(或经过重试),客户端就获取到这个锁;
    使用DEL命令解锁;
    到达超时时间会自动释放锁。

    在解锁时,增加一些设计,让系统更加健壮:

    1. 不要使用固定的String值作为锁值,而是使用一个不易被猜中的随机值, 业内称为token
    2. 不使用DEL命令释放锁,而是发送script去移除key

    第3、4点是为了解决 :“锁提前过期,客户端A还没有执行完,然后客户端B获取了锁,这时客户端A执行完了,会不会再删锁的时候把B的锁给删掉” -- 4是3技术上的推荐实现。

    脚本如下:

    if redis.call("get",KEYS1] ==ARGV[1])
    then
       return  redis.call("DEL",KEYS[1])
    else
      return 0
    end
    

    下面使用StackExchange.Redis 写了基于以上考量的代码示例:

    
            /// <summary>
            /// Acquires the lock.
            /// </summary>
            /// <param name="key"></param>
            /// <param name="token">随机值</param>
            /// <param name="expireSecond"></param>
            /// <param name="waitLockSeconds">非阻塞锁</param>
            static bool Lock(string key, string token,int expireSecond=10, double waitLockSeconds = 0)
            {
                var waitIntervalMs = 50;
                bool isLock;
                
                DateTime begin = DateTime.Now;
                do
                {
                    isLock = Connection.GetDatabase().StringSet(key, token, TimeSpan.FromSeconds(expireSecond), When.NotExists);
                    if (isLock)
                        return true;
    
                    //不等待锁则返回
                    if (waitLockSeconds == 0) break;
                    //超过等待时间,则不再等待
                    if ((DateTime.Now - begin).TotalSeconds >= waitLockSeconds) break;
    
                    Thread.Sleep(waitIntervalMs);
                } while (!isLock);
                return false;
            }
    
            /// <summary>  
            /// Releases the lock.  
            /// </summary>  
            /// <returns><c>true</c>, if lock was released, <c>false</c> otherwise.</returns>  
            /// <param name="key">Key.</param>  
            /// <param name="value">value</param>  
            static bool UnLock(string key, string value)
            {
                string lua_script = @"  
                    if (redis.call('GET', KEYS[1]) == ARGV[1]) then  
                        redis.call('DEL', KEYS[1])  
                        return true  
                    else  
                        return false  
                    end  
                    ";
    
                try
                {
                    var res = Connection.GetDatabase().ScriptEvaluate(lua_script,
                                                               new RedisKey[] { key },
                                                               new RedisValue[] { value });
                    return (bool)res;
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"ReleaseLock lock fail...{ex.Message}");
                    return false;
                }
            }
            
            private static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
            {
                ConfigurationOptions configuration = new ConfigurationOptions
                {
                    AbortOnConnectFail = false,
                    ConnectTimeout = 5000,
                };
    
                configuration.EndPoints.Add("10.100.219.9", 6379);
    
                return ConnectionMultiplexer.Connect(configuration.ToString());
            }); 
            public static ConnectionMultiplexer Connection => lazyConnection.Value;
    

    以上代码新增了第五点考量:

    1. 为避免无限制抢锁,增加了非阻塞锁: 轮询_s等待锁,未等到则不再抢锁

    使用方式:

    下面并行开启三个任务,减少库存:

      static void Main(string[] args)
            {
                // 尝试并行执行3个任务
                Parallel.For(0, 3, x =>
                {
                    string token = $"loki:{x}";
                    bool isLocked = Lock("loki", token, 5, 10);
                
                    if (isLocked)
                    {
                        Console.WriteLine($"{token} begin reduce stocks (with lock) at {DateTime.Now}.");
                        Thread.Sleep(1000);
                        Console.WriteLine($"{token} release lock {UnLock("loki", token)} at {DateTime.Now}. ");
                    }
                    else
                    {
                        Console.WriteLine($"Not acquire {token} lock at {DateTime.Now}.");
                    }
                });
            }
    

    输出总结

    回到原下单场景,我们针对这个商品Id,设置了分布式锁,高并发秒杀这个商品的时候,获取/释放这个特定的锁。

    该锁不是全局的,针对这个商品的分布式锁并不会影响其他商品的购物流程。

    本文从基础的线程安全,八卦文线程同步,延伸到跨主机的资源线程/进程安全, 其中演示了利用RedisSET命令做分布式锁的设计方案,虽然是面试八股文,我们依旧需要仔细揣摩Redis Lock的细节考量。

    分布式锁 一方面锁定资源,另一方面开发者要考虑多进程执行代码的推演,这在某种意义是也是对代码加锁。

  • 相关阅读:
    2.pt-table-checksum工具
    Mysql8.0新特性01
    12.redis 之阅读大佬文章随笔
    4.Mysql之Mysqldump命令
    5. 关于高负载服务器Kernel的TCP参数优化
    Mysql Oracle 备份表数据、批量删除表数据
    Mysql limit用法
    Java 字符串数组转字符串
    Layui 自定义年份下拉框并且可输入
    Mysql 生成UUID
  • 原文地址:https://www.cnblogs.com/JulianHuang/p/14888190.html
Copyright © 2011-2022 走看看