zoukankan      html  css  js  c++  java
  • [Node.js] 基于Redis实现分布式锁

    分布式锁出应用场景有很多,比如库存扣减什么的,不再多说。

    我第一次接触这个是做数据同步服务。同步服务可能有多个实例,使用node.js开发,多个同步服务竟争为多个客户数据进行同步。

    在实现分布式锁的时候,有两个特别重要的点要注意:

    1. 具有失效机制,防止死锁(加锁的实例不排除中途释放)

    2. 高可用高性能的锁获取与释放

    下面直接上代码:

    ...
    
    enum LockState {
      wait,  // 等待中,空闲中
      execute // 正在执行
    }
    
    // 分布式部分的主要代码
    async execute(data: xxx) {
      // 获取当前实例的标识作为 owner,主要用来区分多个实例时,加锁的服务是否为当前服务
      let owner = this.getOwner(); 
      try {
        // 获取锁
        let lock = await this.getLock(data.id);
        // 如果锁正在执行,并且所有者不是本实例,则退出
        // 这种情况说明是其它实例正在处理这个data
        // 在我的项目中,实际情况是,另一个同步服务正在给这个客户同步数据,所以我们这个服务实例,就不需要再给它同步了,直接跳过,处理后继的客户
        if (item.state == LockState.execute && item.owner != owner)
          return;
          
        lock.owner = owner;
        lock.state = LockState.execute;
        // 加锁
        awiat this.setLock(data.id, lock);
        
        // 等待500毫秒
        await Utils.sleep(500);
        // 重新取一下同步状态, 防止并发冲突
        let lock = await this.getLock(data.id);
        // 如果所有者不为当前服务或状态不是执行中,则退出
        if (lock.owner != data.owner || lock.state != LockState.execute) {
          // 注意了,这里为什么要等个500ms,再来获取锁,判断锁的状态呢?
          // 那是因为,由于网络延时,我们上锁的时候,可能会有另一个实例也来给这个data上锁,我们不一定就成功了。
          // 但我们也不能马上去获取锁看是否成功,还是因为网络延时的原因。
          // 所以这里等待一小儿,则可以避免此情况的出现。
          return;
        }
          
        // 开一个定时器, 每隔5秒更新锁, 防止丢失
        // 为什么要加这个呢? 很简单,我们锁内的代码执行一次,不一定60秒就完成了,
        // 比如我们第一次给某个客户同步数据,可能要好几分钟
        // 那为什么又不能一直锁着,不让它超时呢?
        // 那是因为可能会造成死锁,比如我们的实例意外中止,半天起不起来的话,那这个data就会一直锁着
        let timer: Timer = setInterval(async () => {
            if (lock.state == LockState.execute) {
                await this.setLock(item.id, item.state);
            }
        }, 5000);  
        
        try {
          // 此处才真正开始执行对这个数据的处理
          await this.exec(data); 
        } finally {
          // 清除定时器
          if (timer) clearInterval(timer);
          // 同步完成,释放锁
          lock.state = LockState.wait;
          lock.lastSync = Utils.getTimeInMillis();  // 记录一个同步时间,比如我们可以按此来排序,优先处理等得最久的客户数据
          await this.setLock(data.id, lock);
        }
      } catch (e) {
        Logger.error(e);
      }
    }
    
    // 获取锁
    async getLock(key: string): Promise<Lock> {
      let v = await cache.get(key);
      return (new Lock()).load(v);
    }
    
    // 设置锁
    async setLock(key: string, state: Lock) {
      // 往redis中写入一个数据,有效期 60 秒
      await cache.setEx(key, 60, state.toString());
    }
    
    // 返回当前服务实例标识,用于和其它服务作区分
    getOwner(): string {
      // 比如这里用本机的ip地址加上缓存标识作为owner标识
      return `lock:${AppUtils.getIPAddress()}:${CacheKeys.flag}`;
    }
    
    
    export class Utils {
    
        /**
         * 等待指定的时间
         * @param ms
         */
        static async sleep(ms: number) {
            return new Promise((resolve) => {
                setTimeout(() => {
                    resolve('');
                }, ms)
            });
        }
        
        /**
         * 获取当前时间戳 (与java相同)
         */
        static getTimeInMillis(): number {
            return new Date().getTime() - AppUtils._time1970;
        }
    
    }
    ...
  • 相关阅读:
    django项目环境搭建备忘
    Python IDE的选择和安装
    MAC上python环境搭建
    hadoop1.2.1+hbase0.90.4+nutch2.2.1+elasticsearch0.90.5配置(伪分布式)
    ubuntu下hadoop完全分布式部署
    ubuntu下集群设置静态ip
    C语言调用库函数实现生产者消费者问题
    poj 1703(带权并查集)
    poj 1330
    poj1724
  • 原文地址:https://www.cnblogs.com/yangyxd/p/14505542.html
Copyright © 2011-2022 走看看