zoukankan      html  css  js  c++  java
  • 索引管理

    Reindexing

    前文描述了如何通过创建graph index和vertex-centric index来提高性能。如果索引的label或key如果与创建索引操作在同一个事务中,这些索引可以立即生效,也就没有必要进行reindex了。反之,如果需要索引的key和label已经提前创建了,则需要重新索引整张图来使索引生效。

    Overview

    Janusgraph在索引定义完之后就可以进行增量更新,但在索引完整和可用前,JansuGraph必须要对索引schema相关的且已存在的所有元素执行一次全量读。一旦reindexing工作完成,索引即进入可以被使用的状态。然后通过设置索引为enable状态启用。

    Prior to Reindexing

    reindexing过程从索引构建完毕就开始了,需要注意的是,global graph index由其名称唯一定义,vertex-centric index由名称及edgg label或属性key唯一定义。
    在对已经存在的schema元素定义了新的索引后,推荐用户等待几分钟,以使得索引能够通知到整个集群。注意索引名称,因在reindex时该名称是必须的。

    Preparing to Reindex

    reindex作业有两种执行框架可选。
    • MapReduce
    • JanusGraphManagement
     
    在Mapreduce框架上进行reindex支持大的,横向分布的数据库。JanusGraphManagement适用于单机OPLP作业,能够为单节点数据库提供便利性和速度。
     
    reindex需要:
    • index name(用户构建索引时提供的名称)
    • index type(vertex-centric index的edge label或property key的名称),仅适用于vertex-centric label

    Executing a Reindex Job on MapReduce

    推荐使用MapReduceManagement类进行reindex,下面是大略过程:
    • 开启一个JanusGraph实例
    • 将图的实例传给MapReduceIndexManagement构造器
    • 在MapReduceManagement实例上调用updateIndex(<index>, SchemaAction.REINDEX)
    • 如果该索引还么有enable,通过JanusGraphManagement启用
     
    该类实现了一个updateIndex方法,且仅支持REINDEX和REMOVE_INDEX操作。该类使用classpath上的Hadoop配置和lib启动了一个Hadoop Mapreduce作业。同时支持Hadoop1和Hadoop2。该类通过传入其中的JanusGraph实例获取索引和后端存储的元数据。
    graph = JanusGraphFactory.open(...)
    mgmt = graph.openManagement()
    mr = new MapReduceIndexManagement(graph)
    mr.updateIndex(mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime"), SchemaAction.REINDEX).get()
    mgmt.commit()

    Reindex Example on MapReduce:

    基于cassandra提供的一个完整的例子。
    // Open a graph
    graph = JanusGraphFactory.open("conf/janusgraph-cassandra-es.properties")
    g = graph.traversal()
    
    // Define a property
    mgmt = graph.openManagement()
    desc = mgmt.makePropertyKey("desc").dataType(String.class).make()
    mgmt.commit()
    
    // Insert some data
    graph.addVertex("desc", "foo bar")
    graph.addVertex("desc", "foo baz")
    graph.tx().commit()
    
    // Run a query -- note the planner warning recommending the use of an index
    g.V().has("desc", containsText("baz"))
    
    // Create an index
    mgmt = graph.openManagement()
    
    desc = mgmt.getPropertyKey("desc")
    mixedIndex = mgmt.buildIndex("mixedExample", Vertex.class).addKey(desc).buildMixedIndex("search")
    mgmt.commit()
    
    // Rollback or commit transactions on the graph which predate the index definition
    graph.tx().rollback()
    
    // Block until the SchemaStatus transitions from INSTALLED to REGISTERED
    report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").call()
    
    // Run a JanusGraph-Hadoop job to reindex
    mgmt = graph.openManagement()
    mr = new MapReduceIndexManagement(graph)
    mr.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.REINDEX).get()
    
    // Enable the index
    mgmt = graph.openManagement()
    mgmt.updateIndex(mgmt.getGraphIndex("mixedExample"), SchemaAction.ENABLE_INDEX).get()
    mgmt.commit()
    
    // Block until the SchemaStatus is ENABLED
    mgmt = graph.openManagement()
    report = mgmt.awaitGraphIndexStatus(graph, "mixedExample").status(SchemaStatus.ENABLED).call()
    mgmt.rollback()
    
    // Run a query -- JanusGraph will use the new index, no planner warning
    g.V().has("desc", containsText("baz"))
    
    // Concerned that JanusGraph could have read cache in that last query, instead of relying on the index?
    // Start a new instance to rule out cache hits.  Now we're definitely using the index.
    graph.close()
    graph = JanusGraphFactory.open("conf/janusgraph-cassandra-es.properties")
    g.V().has("desc", containsText("baz"))

    Executing a Reindex job on JanusGraphManagement

    使用JanusGraphManagement.updateIndex()方法触发reindex作业,并携带参数SchemaAction.REINDEX参数,如:
    m = graph.openManagement()
    i = m.getGraphIndex('indexName')
    m.updateIndex(i, SchemaAction.REINDEX).get()
    m.commit()

    Example for JanusGraphManagement

    下面的例子使用了BerkeleyDB作为存储后端。
    import org.janusgraph.graphdb.database.management.ManagementSystem
    
    // Load some data from a file without any predefined schema
    graph = JanusGraphFactory.open('conf/janusgraph-berkeleyje.properties')
    g = graph.traversal()
    m = graph.openManagement()
    m.makePropertyKey('name').dataType(String.class).cardinality(Cardinality.LIST).make()
    m.makePropertyKey('lang').dataType(String.class).cardinality(Cardinality.LIST).make()
    m.makePropertyKey('age').dataType(Integer.class).cardinality(Cardinality.LIST).make()
    m.commit()
    graph.io(IoCore.gryo()).readGraph('data/tinkerpop-modern.gio')
    graph.tx().commit()
    
    // Run a query -- note the planner warning recommending the use of an index
    g.V().has('name', 'lop')
    graph.tx().rollback()
    
    // Create an index
    m = graph.openManagement()
    m.buildIndex('names', Vertex.class).addKey(m.getPropertyKey('name')).buildCompositeIndex()
    m.commit()
    graph.tx().commit()
    
    // Block until the SchemaStatus transitions from INSTALLED to REGISTERED
    ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.REGISTERED).call()
    
    // Reindex using JanusGraphManagement
    m = graph.openManagement()
    i = m.getGraphIndex('names')
    m.updateIndex(i, SchemaAction.REINDEX)
    m.commit()
    
    // Enable the index
    ManagementSystem.awaitGraphIndexStatus(graph, 'names').status(SchemaStatus.ENABLED).call()
    
    // Run a query -- JanusGraph will use the new index, no planner warning
    g.V().has('name', 'lop')
    graph.tx().rollback()
    
    // Concerned that JanusGraph could have read cache in that last query, instead of relying on the index?
    // Start a new instance to rule out cache hits.  Now we're definitely using the index.
    graph.close()
    graph = JanusGraphFactory.open("conf/janusgraph-berkeleyje.properties")
    g = graph.traversal()
    g.V().has('name', 'lop')

    Index Removal

    索引的移除是由多个步骤组成的手动过程,这些步骤需要仔细执行,以避免出现不一致的情况。

    Overview

    索引的删除是一个两段过程,在第一阶段,JanusGraph通知所有的存储后端该索引准备被删除,这个步骤索引的状态将会被设置为DISABLED。之后,JanusGraph停止使用该索引来响应查询请求,并停止索引的增量更新。与索引关联的数据在存储后端保持最新,但为索引所忽略。
     
    第二阶段依赖于索引是mixed还是composite,composite索引可以通过JansuGraph删除,类似于reindex过程,可以通过MapReduce或者JanusGraphManagement实现。但是mixed 索引必须手动在索引后端删除;JanusGraph没有提供自动删除机制。
     
    删除索引会同时移除索引关联的除了schema定义和DISABLE状态的一切,schema在索引删除后依然存在。

    Prepareing for Index Removal

    如果索引当前是enabled的,需要首先被disabled,这个操作通过ManagementSystem。
    mgmt = graph.openManagement()
    rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"), "battlesByTime")
    mgmt.updateIndex(rindex, SchemaAction.DISABLE_INDEX).get()
    gindex = mgmt.getGraphIndex("byName")
    mgmt.updateIndex(gindex, SchemaAction.DISABLE_INDEX).get()
    mgmt.commit()
    一旦当索引上的key的状态均变更为DISABLED,索引就准备被删除了。ManagementSystem中的一个工具可以等待DISABLE步骤。
    ManagementSystem.awaitGraphIndexStatus(graph, 'byName').status(SchemaStatus.DISABLED).call()
    当一个composite index设置为DISABLED状态后,可以通过以下两个框架进行索引的删除。
    • MapReduce
    • JanusGraphManagement
    通过MapReduce支持大型的,横向分布的数据库;通过JanusGraphMangement支持单节点OPAP作业,通常适用于单机节点。
     
    删除索引需要:
    • index name
    • index type(对vertex-centric索引适用)

    Executing an Index Removal Job on MapReduce

    与reindex步骤一样,推荐通过MapReduceManagment类执行索引移除作业。下面是大致执行步骤:
    • 打开一个JanusGraph实例
    • 如果索引不是禁用状态,通过JanusGraphManagement禁用
    • 将graph实例传入MapReduceManagment构造器
    • 执行updateIndex(<index>, SchemaAction.REMOVAL_INDEX)

    Example for MapReduce 

    import org.janusgraph.graphdb.database.management.ManagementSystem
    
    // Load the "Graph of the Gods" sample data
    graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
    g = graph.traversal()
    GraphOfTheGodsFactory.load(graph)
    
    g.V().has('name', 'jupiter')
    
    // Disable the "name" composite index
    m = graph.openManagement()
    nameIndex = m.getGraphIndex('name')
    m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
    m.commit()
    graph.tx().commit()
    
    // Block until the SchemaStatus transitions from INSTALLED to REGISTERED
    ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()
    
    // Delete the index using MapReduceIndexJobs
    m = graph.openManagement()
    mr = new MapReduceIndexManagement(graph)
    future = mr.updateIndex(m.getGraphIndex('name'), SchemaAction.REMOVE_INDEX)
    m.commit()
    graph.tx().commit()
    future.get()
    
    // Index still shows up in management interface as DISABLED -- this is normal
    m = graph.openManagement()
    idx = m.getGraphIndex('name')
    idx.getIndexStatus(m.getPropertyKey('name'))
    m.rollback()
    
    // JanusGraph should issue a warning about this query requiring a full scan
    g.V().has('name', 'jupiter')

    Executing an Index Removal job on JanusGraphManagement

    使用JanusGraphManagement.updateIndex()方法使用参数SchemaAction.REMOVE_INDEX运行一个索引移除作业。
    m = graph.openManagement()
    i = m.getGraphIndex('indexName')
    m.updateIndex(i, SchemaAction.REMOVE_INDEX).get()
    m.commit()

    Example for JanusGraphManagement

    该例子使用BerkeleyDB后端。
    import org.janusgraph.graphdb.database.management.ManagementSystem
    
    // Load the "Graph of the Gods" sample data
    graph = JanusGraphFactory.open('conf/janusgraph-cassandra-es.properties')
    g = graph.traversal()
    GraphOfTheGodsFactory.load(graph)
    
    g.V().has('name', 'jupiter')
    
    // Disable the "name" composite index
    m = graph.openManagement()
    nameIndex = m.getGraphIndex('name')
    m.updateIndex(nameIndex, SchemaAction.DISABLE_INDEX).get()
    m.commit()
    graph.tx().commit()
    
    // Block until the SchemaStatus transitions from INSTALLED to REGISTERED
    ManagementSystem.awaitGraphIndexStatus(graph, 'name').status(SchemaStatus.DISABLED).call()
    
    // Delete the index using JanusGraphManagement
    m = graph.openManagement()
    nameIndex = m.getGraphIndex('name')
    future = m.updateIndex(nameIndex, SchemaAction.REMOVE_INDEX)
    m.commit()
    graph.tx().commit()
    
    future.get()
    
    m = graph.openManagement()
    nameIndex = m.getGraphIndex('name')
    
    g.V().has('name', 'jupiter')

    Common Problems with Index Management

    IllegalArgumentException when starting job

    通常出现如下错误:
    The index mixedExample is in an invalid state and cannot be indexed.
    The following index keys have invalid status: desc has status INSTALLED
    (status must be one of [REGISTERED, ENABLED])
    The index mixedExample is in an invalid state and cannot be indexed.
    The index has status INSTALLED, but one of [REGISTERED, ENABLED] is required
    当创建一个索引后,索引会被广播到所有的JanusGraph实例,JanusGraph集群的节点需要一段时间(可能是几分钟,视集群大小而定)才能感知到索引的存在。只有当所有节点均感知到索引存在后,reindexing才能被执行。因此用户在创建完索引后需要等待一段时间。
     
    但是需要注意的是,这种感知可能由于JanusGraph实例的错误而无法完成,换句话说,JansuGraph可能无限等待失效节点的响应。在这种情况下,用户必须手动剔除失效实例。参考http://docs.janusgraph.org/latest/failure-recovery.html。当状态重置后,需要在Management中手动重新注册索引。
    mgmt = graph.openManagement()
    rindex = mgmt.getRelationIndex(mgmt.getRelationType("battled"),"battlesByTime")
    mgmt.updateIndex(rindex, SchemaAction.REGISTER_INDEX).get()
    gindex = mgmt.getGraphIndex("byName")
    mgmt.updateIndex(gindex, SchemaAction.REGISTER_INDEX).get()
    mgmt.commit()

    Could not find index

    索引名称错误,在操作global index时,只需要提供索引名称即可,当操作vertex-centric index时还要指定label和索引属性。

    Cassandra Mappers Fail with "Too Many openfiles"

    错误栈类似于:
    java.net.SocketException: Too many open files
            at java.net.Socket.createImpl(Socket.java:447)
            at java.net.Socket.getImpl(Socket.java:510)
            at java.net.Socket.setSoLinger(Socket.java:988)
            at org.apache.thrift.transport.TSocket.initSocket(TSocket.java:118)
            at org.apache.thrift.transport.TSocket.<init>(TSocket.java:109)
     
    解决方案为:
    • Reduce the maximum size of the Cassandra connection pool. For example, consider setting the cassandrathrift storage backend’smax-activeandmax-idleoptions to 1 each, and settingmax-totalto -1. SeeChapter 12,Configuration Referencefor full listings of connection pool settings on the Cassandra storage backends.
    • Increase thenofileulimit. The ideal value depends on the size of the Cassandra dataset and the throughput of the reindex mappers; if starting at 1024, try an order of magnitude larger: 10000. This is just necessary to sustain lingering TIME_WAIT sockets. The reindex job won’t try to open nearly that many sockets at once.
    • Run the reindex task on a multi-node MapReduce cluster to spread out the socket load.
  • 相关阅读:
    react路由组件&&非路由组件
    react函数式组件(非路由组件)实现路由跳转
    react使用antd组件递归实现左侧菜单导航树
    【LeetCode】65. Valid Number
    【LeetCode】66. Plus One (2 solutions)
    【LeetCode】68. Text Justification
    【LeetCode】69. Sqrt(x) (2 solutions)
    【LeetCode】72. Edit Distance
    【LeetCode】73. Set Matrix Zeroes (2 solutions)
    【LeetCode】76. Minimum Window Substring
  • 原文地址:https://www.cnblogs.com/jiyuqi/p/7466263.html
Copyright © 2011-2022 走看看