zoukankan      html  css  js  c++  java
  • 90

    图数据库文章总目录:
    整理所有图相关文章,请移步(超链):图数据库系列-文章总目录
    地址:https://liyangyang.blog.csdn.net/article/details/111031257
    源码分析相关可查看github(码文不易,求个star~): https://github.com/YYDreamer/janusgraph

    下述流程高清大图地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629

    版本:JanusGraph-0.5.2

    转载文章请保留以下声明:

    作者:洋仔聊编程
    微信公众号:匠心Java
    原文地址:https://liyangyang.blog.csdn.net/

    在分布式系统中,难免涉及到对同一数据的并发操作,如何保证分布式系统中数据的并发安全呢?分布式锁!

    一:分布式锁
    常用的分布式锁实现方式:

    1、基于数据库实现分布式锁

    ​ 针对于数据库实现的分布式锁,如mysql使用使用for update共同竞争一个行锁来实现; 在JanusGraph中,也是基于数据库实现的分布式锁,这里的数据库指的是我们当前使用的第三方backend storage,具体的实现方式也和mysql有所不同,具体我们会在下文分析

    2、基于Redis实现的分布式锁

    ​ 基于lua脚本+setNx实现

    3、基于zk实现的分布式锁

    ​ 基于znode的有序性和临时节点+zk的watcher机制实现

    4、MVCC多版本并发控制乐观锁实现

    本文主要介绍Janusgraph的锁机制,其他的实现机制就不在此做详解了

    下面我们来分析一下JanusGraph的锁机制实现~

    二:JanusGraph锁机制
    在JanusGraph中使用的锁机制是:本地锁 + 分布式锁来实现的;

    2.1 一致性行为
    在JanusGraph中主要有三种一致性修饰词(Consistency Modifier)来表示3种不同的一致性行为,来控制图库使用过程中的并发问题的控制程度;

    public enum ConsistencyModifier {
    DEFAULT,
    LOCK,
    FORK
    }
    源码中ConsistencyModifier枚举类主要作用:用于控制JanusGraph在最终一致或其他非事务性后端系统上的一致性行为!其作用分别为:

    DEFAULT:默认的一致性行为,不使用分布式锁进行控制,对配置的存储后端使用由封闭事务保证的默认一致性模型,一致性行为主要取决于存储后端的配置以及封闭事务的(可选)配置;无需显示配置即可使用
    LOCK:在存储后端支持锁的前提下,显示的获取分布式锁以保证一致性!确切的一致性保证取决于所配置的锁实现;需management.setConsistency(element, ConsistencyModifier.LOCK);语句进行配置
    FORK:只适用于multi-edges和list-properties两种情况下使用;使JanusGraph修改数据时,采用先删除后添加新的边/属性的方式,而不是覆盖现有的边/属性,从而避免潜在的并发写入冲突;需management.setConsistency(element, ConsistencyModifier.FORK);进行配置
    LOCK
    在查询或者插入数据时,是否使用分布式锁进行并发控制,在图shcema的创建过程中,如上述可以通过配置schema元素为ConsistencyModifier.LOCK方式控制并发,则在使用过程中就会用分布式锁进行并发控制;

    为了提高效率,JanusGraph默认不使用锁定。 因此,用户必须为定义一致性约束的每个架构元素决定是否使用锁定。

    使用JanusGraphManagement.setConsistency(element,ConsistencyModifier.LOCK)显式启用对架构元素的锁定

    代码如下所示:

    mgmt = graph.openManagement()
    name = mgmt.makePropertyKey(‘consistentName’).dataType(String.class).make()
    index = mgmt.buildIndex(‘byConsistentName’, Vertex.class).addKey(name).unique().buildCompositeIndex()
    mgmt.setConsistency(name, ConsistencyModifier.LOCK) // Ensures only one name per vertex
    mgmt.setConsistency(index, ConsistencyModifier.LOCK) // Ensures name uniqueness in the graph
    mgmt.commit()
    FORK
    由于边缘作为单个记录存储在基础存储后端中,因此同时修改单个边缘将导致冲突。

    FORK就是为了代替LOCK,可以将边缘标签配置为使用ConsistencyModifier.FORK。

    下面的示例创建一个新的edge label,并将其设置为ConsistencyModifier.FORK

    mgmt = graph.openManagement()
    related = mgmt.makeEdgeLabel(‘related’).make()
    mgmt.setConsistency(related, ConsistencyModifier.FORK)
    mgmt.commit()
    经过上述配置后,修改标签配置为FORK的edge时,操作步骤为:

    首先,删除该边
    将修改后的边作为新边添加
    因此,如果两个并发事务修改了同一边缘,则提交时将存在边缘的两个修改后的副本,可以在查询遍历期间根据需要解决这些副本。

    注意edge fork仅适用于MULTI edge。 具有多重性约束的边缘标签不能使用此策略,因为非MULTI的边缘标签定义中内置了一个唯一性约束,该约束需要显式锁定或使用基础存储后端的冲突解决机制

    下面我们具体来看一下janusgrph的锁机制的实现:

    2.2 LoackID
    在介绍锁机制之前,先看一下锁应该锁什么东西呢?

    我们都知道在janusgraph的底层存储中,vertexId作为Rowkey,属性和边存储在cell中,由column+value组成

    当我们修改节点的属性和边+边的属性时,很明显只要锁住对应的Rowkey + Column即可;

    在Janusgraph中,这个锁的标识的基础部分就是LockID:

    LockID = RowKey + Column

    源码如下:

    KeyColumn lockID = new KeyColumn(key, column);
    2.3 本地锁
    本地锁是在任何情况下都需要获取的一个锁,只有获取成功后,才会进行下述分布式锁的获取!

    本地锁是基于图实例维度存在的;主要作用是保证当前图实例下的操作中无冲突!

    本地锁的实现是通过ConcurrentHashMap数据结构来实现的,在图实例维度下唯一;

    基于当前事务+lockId来作为锁标识;

    获取的主要流程:

    image-20200810170411991

    结合源码如下:

    上述图建议依照源码一块分析,源码在LocalLockMediator类中的下述方法,下面源码分析模块会详细分析

    public boolean lock(KeyColumn kc, T requester, Instant expires) {
    }
    
    • 1
    • 2

    引入本地锁机制,主要目的: 在图实例维度来做一层锁判断,减少分布式锁的并发冲突,减少分布式锁带来的性能消耗

    2.4 分布式锁
    在本地锁获取成功之后才会去尝试获取分布式锁;

    分布式锁的获取整体分为两部分流程:

    分布式锁信息插入
    分布式锁信息状态判断
    分布式锁信息插入
    该部分主要是通过lockID来构造要插入的Rowkey和column并将数据插入到hbase中;插入成功即表示这部分处理成功!

    具体流程如下:

    2

    分布式锁信息状态判断
    该部分在上一部分完成之后才会进行,主要是判断分布式锁是否获取成功!

    查询出当前hbase中对应Rowkey的所有column,过滤未过期的column集合,比对集合的第一个column是否等于当前事务插入的column;

    等于则获取成功!不等于则获取失败!

    具体流程如下:

    3

    三:源码分析 与 整体流程
    源码分析已经push到github:https://github.com/YYDreamer/janusgraph

    1、获取锁的入口

    public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException {
        // locker是一个一致性key锁对象
        if (locker != null) {
            // 获取当前事务对象
            ExpectedValueCheckingTransaction tx = (ExpectedValueCheckingTransaction) txh;
            // 判断:当前的获取锁操作是否当前事务的操作中存在增删改的操作
            if (tx.isMutationStarted())
                throw new PermanentLockingException("Attempted to obtain a lock after mutations had been persisted");
            // 使用key+column组装为lockID,供下述加锁使用!!!!!
            KeyColumn lockID = new KeyColumn(key, column);
            log.debug("Attempting to acquireLock on {} ev={}", lockID, expectedValue);
            // 获取本地当前jvm进程中的写锁(看下述的 1:写锁获取分析)
            // (此处的获取锁只是将对应的KLV存储到Hbase中!存储成功并不代表获取锁成功)
            // 1. 获取成功(等同于存储成功)则继续执行
            // 2. 获取失败(等同于存储失败),会抛出异常,抛出到最上层,打印错误日志“Could not commit transaction ["+transactionId+"] due to exception” 并抛出对应的异常,本次插入数据结束
            locker.writeLock(lockID, tx.getConsistentTx());
            // 执行前提:上述获取锁成功!
            // 存储期望值,此处为了实现当相同的key + value + tx多个加锁时,只处理第一个
            // 存储在事务对象中,标识在commit判断锁是否获取成功时,当前事务插入的是哪个锁信息
            tx.storeExpectedValue(this, lockID, expectedValue);
        } else {
            // locker为空情况下,直接抛出一个运行时异常,终止程序
            store.acquireLock(key, column, expectedValue, unwrapTx(txh));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    2、执行 locker.writeLock(lockID, tx.getConsistentTx()) 触发锁获取

    public void writeLock(KeyColumn lockID, StoreTransaction tx) throws TemporaryLockingException, PermanentLockingException {
    
        if (null != tx.getConfiguration().getGroupName()) {
            MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_CALLS).inc();
        }
    
        // 判断当前事务是否在图实例的维度 已经占据了lockID的锁
        // 此处的lockState在一个事务成功获取本地锁+分布式锁后,以事务为key、value为map,其中key为lockID,value为加锁状态(开始时间、过期时间等)
        if (lockState.has(tx, lockID)) {
            log.debug("Transaction {} already wrote lock on {}", tx, lockID);
            return;
        }
    
        // 当前事务没有占据lockID对应的锁
        // 进行(lockLocally(lockID, tx) 本地加锁锁定操作,
        if (lockLocally(lockID, tx)) {
            boolean ok = false;
            try {
                // 在本地锁获取成功的前提下:
                // 尝试获取基于Hbase实现的分布式锁;
                // 注意!!!(此处的获取锁只是将对应的KLV存储到Hbase中!存储成功并不代表获取锁成功)
                S stat = writeSingleLock(lockID, tx);
                // 获取锁分布式锁成功后(即写入成功后),更新本地锁的过期时间为分布式锁的过期时间
                lockLocally(lockID, stat.getExpirationTimestamp(), tx); // update local lock expiration time
                // 将上述获取的锁,存储在标识当前存在锁的集合中Map<tx,Map<lockID,S>>,  key为事务、value中的map为当前事务获取的锁,key为lockID,value为当前获取分布式锁的ConsistentKeyStatus(一致性密匙状态)对象
                lockState.take(tx, lockID, stat);
                ok = true;
            } catch (TemporaryBackendException tse) {
                // 在获取分布式锁失败后,捕获该异常,并抛出该异常
                throw new TemporaryLockingException(tse);
            } catch (AssertionError ae) {
                // Concession to ease testing with mocks & behavior verification
                ok = true;
                throw ae;
            } catch (Throwable t) {
                // 出现底层存储错误! 则直接加锁失败!
                throw new PermanentLockingException(t);
            } finally {
                // 判断是否成功获取锁,没有获分布式锁的,则释放本地锁
                if (!ok) {
                    // 没有成功获取锁,则释放本地锁
                    // lockState.release(tx, lockID); // has no effect
                    unlockLocally(lockID, tx);
                    if (null != tx.getConfiguration().getGroupName()) {
                        MetricManager.INSTANCE.getCounter(tx.getConfiguration().getGroupName(), M_LOCKS, M_WRITE, M_EXCEPTIONS).inc();
                    }
                }
            }
        } else {
            // 如果获取本地锁失败,则直接抛出异常,不进行重新本地争用
    
            // Fail immediately with no retries on local contention
            throw new PermanentLockingException("Local lock contention");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    包含两个部分:

    本地锁的获取lockLocally(lockID, tx)
    分布式锁的获取writeSingleLock(lockID, tx) 注意此处只是将锁信息写入到Hbase中,并不代表获取分布式锁成功,只是做了上述介绍的第一个阶段分布式锁信息插入
    3、本地锁获取 lockLocally(lockID, tx)

    public boolean lock(KeyColumn kc, T requester, Instant expires) {
    assert null != kc;
    assert null != requester;

        final StackTraceElement[] acquiredAt = log.isTraceEnabled() ?
                new Throwable("Lock acquisition by " + requester).getStackTrace() : null;
    
        // map的value,以事务为核心
        final AuditRecord<T> audit = new AuditRecord<>(requester, expires, acquiredAt);
        //  ConcurrentHashMap实现locks, 以lockID为key,事务为核心value
        final AuditRecord<T> inMap = locks.putIfAbsent(kc, audit);
    
        boolean success = false;
    
        // 代表当前map中不存在lockID,标识着锁没有被占用,成功获取锁
        if (null == inMap) {
            // Uncontended lock succeeded
            if (log.isTraceEnabled()) {
                log.trace("New local lock created: {} namespace={} txn={}",
                    kc, name, requester);
            }
            success = true;
        } else if (inMap.equals(audit)) {
            // 代表当前存在lockID,比对旧value和新value中的事务对象是否是同一个
            // requester has already locked kc; update expiresAt
            // 上述判断后,事务对象为同一个,标识当前事务已经获取这个lockID的锁;
            // 1. 这一步进行cas替换,作用是为了刷新过期时间
            // 2. 并发处理,如果因为锁过期被其他事务占据,则占用锁失败
            success = locks.replace(kc, inMap, audit);
            if (log.isTraceEnabled()) {
                if (success) {
                    log.trace("Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
                        kc, name, requester, inMap.expires, audit.expires);
                } else {
                    log.trace("Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}",
                        kc, name, requester, inMap.expires, audit.expires);
                }
            }
        } else if (0 > inMap.expires.compareTo(times.getTime())) {
            // 比较过期时间,如果锁已经过期,则当前事务可以占用该锁
    
            // the recorded lock has expired; replace it
            // 1. 当前事务占用锁
            // 2. 并发处理,如果因为锁过期被其他事务占据,则占用锁失败
            success = locks.replace(kc, inMap, audit);
            if (log.isTraceEnabled()) {
                log.trace("Discarding expired lock: {} namespace={} txn={} expired={}",
                    kc, name, inMap.holder, inMap.expires);
            }
        } else {
            // 标识:锁被其他事务占用,并且未过期,则占用锁失败
            // we lost to a valid lock
            if (log.isTraceEnabled()) {
                log.trace("Local lock failed: {} namespace={} txn={} (already owned by {})",
                    kc, name, requester, inMap);
                log.trace("Owner stacktrace:
            {}", Joiner.on("
            ").join(inMap.acquiredAt));
            }
        }
    
        return success;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    如上述介绍,本地锁的实现是通过ConcurrentHashMap数据结构来实现的,在图实例维度下唯一!

    4、分布式锁获取第一个阶段:分布式锁信息插入

    protected ConsistentKeyLockStatus writeSingleLock(KeyColumn lockID, StoreTransaction txh) throws Throwable {
    
        // 组装插入hbase数据的Rowkey
        final StaticBuffer lockKey = serializer.toLockKey(lockID.getKey(), lockID.getColumn());
        StaticBuffer oldLockCol = null;
    
        // 进行尝试插入 ,默认尝试次数3次
        for (int i = 0; i < lockRetryCount; i++) {
            // 尝试将数据插入到hbase中;oldLockCol表示要删除的column代表上一次尝试插入的数据
            WriteResult wr = tryWriteLockOnce(lockKey, oldLockCol, txh);
            // 如果插入成功
            if (wr.isSuccessful() && wr.getDuration().compareTo(lockWait) <= 0) {
                final Instant writeInstant = wr.getWriteTimestamp(); // 写入时间
                final Instant expireInstant = writeInstant.plus(lockExpire);// 过期时间
                return new ConsistentKeyLockStatus(writeInstant, expireInstant); // 返回插入对象
            }
            // 赋值当前的尝试插入的数据,要在下一次尝试时删除
            oldLockCol = wr.getLockCol();
            // 判断插入失败原因,临时异常进行尝试,非临时异常停止尝试!
            handleMutationFailure(lockID, lockKey, wr, txh);
        }
        // 处理在尝试了3次之后还是没插入成功的情况,删除最后一次尝试插入的数据
        tryDeleteLockOnce(lockKey, oldLockCol, txh);
        // TODO log exception or successful too-slow write here
        // 抛出异常,标识导入数据失败
        throw new TemporaryBackendException("Lock write retry count exceeded");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    上述只是将锁信息插入,插入成功标识该流程结束

    5、分布式锁获取第一个阶段:分布式锁锁定是否成功判定

    这一步,是在commit阶段进行的验证

    public void commit() throws BackendException {
        // 此方法内调用checkSingleLock 检查分布式锁的获取结果
        flushInternal();
        tx.commit();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    最终会调用checkSingleLock方法,判断获取锁的状态!

    protected void checkSingleLock(final KeyColumn kc, final ConsistentKeyLockStatus ls,
                                   final StoreTransaction tx) throws BackendException, InterruptedException {
    
        // 检查是否被检查过
        if (ls.isChecked())
            return;
    
        // Slice the store
        KeySliceQuery ksq = new KeySliceQuery(serializer.toLockKey(kc.getKey(), kc.getColumn()), LOCK_COL_START,
            LOCK_COL_END);
        // 此处从hbase中查询出锁定的行的所有列! 默认查询重试次数3
        List<Entry> claimEntries = getSliceWithRetries(ksq, tx);
    
        // 从每个返回条目的列中提取timestamp和rid,然后过滤出带有过期时间戳的timestamp对象
        final Iterable<TimestampRid> iterable = Iterables.transform(claimEntries,
            e -> serializer.fromLockColumn(e.getColumnAs(StaticBuffer.STATIC_FACTORY), times));
        final List<TimestampRid> unexpiredTRs = new ArrayList<>(Iterables.size(iterable));
        for (TimestampRid tr : iterable) { // 过滤获取未过期的锁!
            final Instant cutoffTime = now.minus(lockExpire);
            if (tr.getTimestamp().isBefore(cutoffTime)) {
                ...
            }
            // 将还未过期的锁记录存储到一个集合中
            unexpiredTRs.add(tr);
        }
        // 判断当前tx是否成功持有锁! 如果我们插入的列是读取的第一个列,或者前面的列只包含我们自己的rid(因为我们是在第一部分的前提下获取的锁,第一部分我们成功获取了基于当前进程的锁,所以如果rid相同,代表着我们也成功获取到了当前的分布式锁),那么我们持有锁。否则,另一个进程持有该锁,我们无法获得锁
        // 如果,获取锁失败,抛出TemporaryLockingException异常!!!! 抛出到顶层的mutator.commitStorage()处,最终导入失败进行事务回滚等操作
        checkSeniority(kc, ls, unexpiredTRs);
        // 如果上述步骤未抛出异常,则标识当前的tx已经成功获取锁!
        ls.setChecked();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    四:整体流程
    总流程如下图:

    4

    整体流程为:

    获取本地锁
    获取分布式锁
    插入分布式锁信息
    commit阶段判断分布式锁获取是否成功
    获取失败,则重试
    五:总结
    JanusGraph的锁机制主要是通过本地锁+分布式锁来实现分布式系统下的数据一致性;

    分布式锁的控制维度为:property、vertex、edge、index都可以;

    JanusGraph支持在数据导入时通过前面一致性行为部分所说的LOCK来开关分布式锁:

    LOCK:数据导入时开启分布式锁保证分布式一致性
    DEFAULT、FORK:数据导入时关闭分布式锁
    是否开启分布式锁思考:

    在开启分布式锁的情况下,数据导入开销非常大;如果是数据不是要求很高的一致性,并且数据量比较大,我们可以选择关闭分布式锁相关,来提高导入速度;

    然后,针对于小数据量的要求高一致性的数据,单独开启分布式锁来保证数据安全;

    另外,我们在不开启分布式锁定的情况下,可以通过针对于导入的数据的充分探查来减少冲突!

    针对于图schema的元素开启还是关闭分布式锁,还是根据实际业务情况来决定。

    本文有任何问题,可加博主微信或评论指出,感谢!

    码文不易,给个赞和star吧~

    本文由博客群发一文多发等运营工具平台 OpenWrite 发布

  • 相关阅读:
    归并排序(Merge Sort)
    AtCoder AGC035D Add and Remove (状压DP)
    AtCoder AGC034D Manhattan Max Matching (费用流)
    AtCoder AGC033F Adding Edges (图论)
    AtCoder AGC031F Walk on Graph (图论、数论)
    AtCoder AGC031E Snuke the Phantom Thief (费用流)
    AtCoder AGC029F Construction of a Tree (二分图匹配)
    AtCoder AGC029E Wandering TKHS
    AtCoder AGC039F Min Product Sum (容斥原理、组合计数、DP)
    AtCoder AGC035E Develop (DP、图论、计数)
  • 原文地址:https://www.cnblogs.com/gd11/p/14227811.html
Copyright © 2011-2022 走看看