zoukankan      html  css  js  c++  java
  • 论分布式系统中单一锁控制的优化

    前言


    在分布式系统中,为了保持数据操作的一致性,我们会看到锁在里面会有广泛的应用。简单一点的,我们可能就用一个简单的对象锁来做线程安全的同步。再细粒度一些的,我们会用到读写锁,然后对具体操作的属性(Read Or Write)在进行读写锁的分离控制。但今天笔者想对此话题再展开细致的探讨,相比较于使用单一锁模式,我们还是可以做出更进一步的改善的。

    单一锁控制的弊端


    如前文所提到的,不管说是普通的object对象锁还是说ReadWriteLock来说,它们本质上还是Single Lock(单一锁)。这种lock在客户端请求巨大的情况下会有严重的锁竞争问题,一个典型的例子是HDFS NN里的FSN锁。一旦某个请求获得了这个锁,那么其它的请求就会被block住了。所以说单一锁的问题在于,它的粒度有些时候还是太粗了。所以这里就涉及到一个锁粒度的问题了。

    单一锁的优化:锁粒度的细化


    那么怎么取做锁粒度的细化呢?读写锁的分离是一种常见的手段。但更好的一种做法是在实际资源上进行锁控制。因为我们使用锁去做线程安全同步控制时的初衷,是为了避免同时又多个操作在修改某个共同的资源(比如元数据的更新)。但其实不同的资源的相互更新其实是不影响的,那这时我们可以将大的锁拆分为A资源锁以及B资源锁。

    用一个更具象的例子来解释这个问题:比如在分布式存储系统的元数据更新里,我们可以将DB级别的lock控制拆分为以table级别的lock。这里的Resource就从DB细粒度为Table。

    锁的管理


    如果锁的粒度已经按照对于实际的资源进行控制的话,这时我们可以考虑在上面包装出一个锁的管理器,由此管理器对外“借出”或“归还”锁。在这个管理类里,我们还能对当前的活跃锁做引用计数的更新。如果某个锁在某次unlock后没有再被外部引用了,我们就可以将其从锁管理器中移除出去了。

    锁的容量控制


    当然我们还需要对锁管理器做一个最大容量的控制,如果当前活跃锁达到容量阈值时,则后面的新锁申请也将会被Block住。这在一定程度上控制的是客户端的concurrency的行为度。

    锁的资源优先级问题


    这里还有一个隐蔽的问题:锁的资源优先级问题。比如说我们有3个资源,它们的资源范围关系如下:

    • A,资源最大(包含多个B类资源)
    • B,资源适中(包含多个C类资源)
    • C,最小粒度资源

    假设我们对上述3类资源做锁控制的话,可以遵循以下原则:

    在持有低优先级资源锁的情况下,不能继续再获取其上资源的锁。持有高优先级资源锁的情况下,还可以继续获取其下低一级资源的锁。

    这个原则对应上面ABC资源的情况就是,用户在获取A锁资源的情况下,可以继续获取B或C资源的锁,而假如说目前已经获取C锁的情况下,是无法获取B锁和A锁的。

    锁管理的demo样例


    以下是摘自Hadoop Ozone里的一个锁管理器的实现,里面涉及到了上文提到的一些要点,供大家学习参考。

    /**
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with this
     * work for additional information regarding copyright ownership.  The ASF
     * licenses this file to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     * <p>
     * http://www.apache.org/licenses/LICENSE-2.0
     * <p>
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    
    package org.apache.hadoop.ozone.lock;
    
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hdds.HddsConfigKeys;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Manages the locks on a given resource. A new lock is created for each
     * and every unique resource. Uniqueness of resource depends on the
     * {@code equals} implementation of it.
     */
    public class LockManager<T> {
    
      private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
    
      private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
      private final GenericObjectPool<ActiveLock> lockPool =
          new GenericObjectPool<>(new PooledLockFactory());
    
      /**
       * Creates new LockManager instance.
       *
       * @param conf Configuration object
       */
      public LockManager(Configuration conf) {
        int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
            HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
        lockPool.setMaxTotal(maxPoolSize);
      }
    
    
      /**
       * Acquires the lock on given resource.
       *
       * <p>If the lock is not available then the current thread becomes
       * disabled for thread scheduling purposes and lies dormant until the
       * lock has been acquired.
       */
      public void lock(T resource) {
        activeLocks.compute(resource, (k, v) -> {
          ActiveLock lock;
          try {
            if (v == null) {
              // 锁为空,则在锁对象池中创建并借出
              lock = lockPool.borrowObject();
            } else {
              lock = v;
            }
            // 增加锁引用计数
            lock.incrementActiveCount();
          } catch (Exception ex) {
            LOG.error("Unable to obtain lock.", ex);
            throw new RuntimeException(ex);
          }
          return lock;
          //得到锁后进行锁的lock操作
        }).lock();
      }
    
      /**
       * Releases the lock on given resource.
       */
      public void unlock(T resource) {
        ActiveLock lock = activeLocks.get(resource);
        if (lock == null) {
          // Someone is releasing a lock which was never acquired. Log and return.
          LOG.warn("Trying to release the lock on {}, which was never acquired.",
              resource);
          return;
        }
        lock.unlock();
        activeLocks.computeIfPresent(resource, (k, v) -> {
          v.decrementActiveCount();
          if (v.getActiveLockCount() != 0) {
            return v;
          }
          // 如果锁没有被引用了,则在锁对象池中移出此锁
          lockPool.returnObject(v);
          return null;
        });
      }
    }
    

    上面的ActiveLock实质上是一个经过简单包装的ReentrantLock

    /**
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with this
     * work for additional information regarding copyright ownership.  The ASF
     * licenses this file to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     * <p>
     * http://www.apache.org/licenses/LICENSE-2.0
     * <p>
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
     * License for the specific language governing permissions and limitations under
     * the License.
     */
    
    package org.apache.hadoop.ozone.lock;
    
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * Lock implementation which also maintains counter.
     */
    public final class ActiveLock {
    
      private Lock lock;
      private AtomicInteger count;
    
      /**
       * Use ActiveLock#newInstance to create instance.
       */
      private ActiveLock() {
        this.lock = new ReentrantLock();
        this.count = new AtomicInteger(0);
      }
    
      /**
       * Creates a new instance of ActiveLock.
       *
       * @return new ActiveLock
       */
      public static ActiveLock newInstance() {
        return new ActiveLock();
      }
    
      /**
       * Acquires the lock.
       *
       * <p>If the lock is not available then the current thread becomes
       * disabled for thread scheduling purposes and lies dormant until the
       * lock has been acquired.
       */
      public void lock() {
        lock.lock();
      }
    
      /**
       * Releases the lock.
       */
      public void unlock() {
        lock.unlock();
      }
    
      /**
       * Increment the active count of the lock.
       */
      void incrementActiveCount() {
        count.incrementAndGet();
      }
    
      /**
       * Decrement the active count of the lock.
       */
      void decrementActiveCount() {
        count.decrementAndGet();
      }
    
      /**
       * Returns the active count on the lock.
       *
       * @return Number of active leases on the lock.
       */
      int getActiveLockCount() {
        return count.get();
      }
    
      /**
       * Resets the active count on the lock.
       */
      void resetCounter() {
        count.set(0);
      }
    
      @Override
      public String toString() {
        return lock.toString();
      }
    }
    
    
  • 相关阅读:
    内网其他服务器节点连接Mysql数据库很慢的解决方案
    MongoDB分片技术原理和高可用集群配置方案
    Hive事务原理和Datax同步事务表问题解决
    Mysql使用存储过程创建测试数据
    Hive的原生部署方式
    ByteArray的操作总结(复制、打印、位运算)
    句柄
    C# 使用指针将不同值类型赋值到字节数组中
    对象、字节流转换
    ASP.NET Core学习日志1
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183551.html
Copyright © 2011-2022 走看看