zoukankan      html  css  js  c++  java
  • DolphinScheduler 源码分析之 DAG类

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one or more
      3  * contributor license agreements.  See the NOTICE file distributed with
      4  * this work for additional information regarding copyright ownership.
      5  * The ASF licenses this file to You under the Apache License, Version 2.0
      6  * (the "License"); you may not use this file except in compliance with
      7  * the License.  You may obtain a copy of the License at
      8  *
      9  *    http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  * Unless required by applicable law or agreed to in writing, software
     12  * distributed under the License is distributed on an "AS IS" BASIS,
     13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  * See the License for the specific language governing permissions and
     15  * limitations under the License.
     16  */
     17 package org.apache.dolphinscheduler.common.graph;
     18 
     19 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
     20 import org.slf4j.Logger;
     21 import org.slf4j.LoggerFactory;
     22 
     23 import java.util.*;
     24 import java.util.concurrent.locks.ReadWriteLock;
     25 import java.util.concurrent.locks.ReentrantReadWriteLock;
     26 
     27 /**
     28  * analysis of DAG
     29  * 对DAG图的分析
     30  * Node: node 指代一个节点,其实一般是Int类型的数字,比如 1
     31  * NodeInfo:node description information 指代节点的描述信息,其实一般是String类型 ,比如 “v(1)”
     32  * EdgeInfo: edge description information 其实一般是String类型 比如 “edge(1 -> 2)”
     33  */
     34 public class DAG<Node, NodeInfo, EdgeInfo> {
     35 
     36 
     37   private static final Logger logger = LoggerFactory.getLogger(DAG.class);
     38 
     39   private final ReadWriteLock lock = new ReentrantReadWriteLock();
     40 
     41   /**
     42    * node map, key is node, value is node information
     43    * 节点映射,键是节点,值是节点信息
     44    */
     45   private volatile Map<Node, NodeInfo> nodesMap;
     46 
     47   /**
     48    * edge map. key is node of origin;value is Map with key for destination node and value for edge
     49    * 边的映射。key是起始节点,value是一个Map,这个Map又是以目标节点作为Key,边的信息作为value的。
     50    */
     51   private volatile Map<Node, Map<Node, EdgeInfo>> edgesMap;
     52 
     53   /**
     54    * reversed edge set,key is node of destination, value is Map with key for origin node and value for edge
     55    * 反转的边的映射。key是目标点,value是一个Map。这个Map又是以起始点为Key,边的信息作为value的。
     56    */
     57   private volatile Map<Node, Map<Node, EdgeInfo>> reverseEdgesMap;
     58 
     59 
     60   public DAG() {
     61     nodesMap = new HashMap<>();
     62     edgesMap = new HashMap<>();
     63     reverseEdgesMap = new HashMap<>();
     64   }
     65 
     66 
     67   /**
     68    * add node information
     69    * 添加一个节点
     70    *
     71    * @param node          node
     72    * @param nodeInfo      node information
     73    */
     74   public void addNode(Node node, NodeInfo nodeInfo) {
     75     lock.writeLock().lock();
     76 
     77     try{
     78       nodesMap.put(node, nodeInfo);
     79     }finally {
     80       lock.writeLock().unlock();
     81     }
     82 
     83   }
     84 
     85 
     86   /**
     87    * add edge
     88    * 添加边
     89    * @param fromNode node of origin 起始点
     90    * @param toNode   node of destination 目标点
     91    * @return The result of adding an edge. returns false if the DAG result is a ring result
     92    * 返回值是一个波尔类型的值,如果添加边之后会形成一个环图,那么就会返回false,并且不会添加这条边,如果添加边之后不会形成一个环图,那么就会返回true。
     93    */
     94   public boolean addEdge(Node fromNode, Node toNode) {
     95     return addEdge(fromNode, toNode, false);
     96   }
     97 
     98 
     99   /**
    100    * add edge
    101    * 添加边
    102    * @param fromNode        node of origin 起始点
    103    * @param toNode          node of destination 目标点
    104    * @param createNode      whether the node needs to be created if it does not exist
    105    *                        如果我添加新的边的时候,起始点或者目标点不存在。
    106    *                        那么我需要看这个createNode参数,如果这里传入true,代表同时可以建立Node,如果是false,代表不可以擅自新建Node。
    107    * @return The result of adding an edge. returns false if the DAG result is a ring result
    108    * 返回值是一个波尔类型的值,如果添加边之后会形成一个环图,那么就会返回false,并且不会添加这条边,如果添加边之后不会形成一个环图,那么就会返回true。
    109    */
    110   private boolean addEdge(Node fromNode, Node toNode, boolean createNode) {
    111     return addEdge(fromNode, toNode, null, createNode);
    112   }
    113 
    114 
    115   /**
    116    * add edge
    117    * 添加边
    118    *
    119    * @param fromNode        node of origin 起始节点
    120    * @param toNode          node of destination 目标节点
    121    * @param edge            edge description 边描述
    122    * @param createNode      whether the node needs to be created if it does not exist
    123    *                        如果我添加新的边的时候,起始点或者目标点不存在。
    124    *                        那么我需要看这个createNode参数,如果这里传入true,代表同时可以建立Node,如果是false,代表不可以擅自新建Node。
    125    * @return The result of adding an edge. returns false if the DAG result is a ring result
    126    * 返回值是一个波尔类型的值,如果添加边之后会形成一个环图,那么就会返回false,并且不会添加这条边,如果添加边之后不会形成一个环图,那么就会返回true。
    127    */
    128   public boolean addEdge(Node fromNode, Node toNode, EdgeInfo edge, boolean createNode) {
    129     lock.writeLock().lock();
    130 
    131     try{
    132 
    133       // Whether an edge can be successfully added(fromNode -> toNode)
    134       // 判断边是否可以成功被添加(起始点-->目标点)
    135       if (!isLegalAddEdge(fromNode, toNode, createNode)) {
    136         logger.error("serious error: add edge({} -> {}) is invalid, cause cycle!", fromNode, toNode);
    137         return false;
    138       }
    139 
    140       addNodeIfAbsent(fromNode, null);
    141       addNodeIfAbsent(toNode, null);
    142 
    143       addEdge(fromNode, toNode, edge, edgesMap);
    144       addEdge(toNode, fromNode, edge, reverseEdgesMap);
    145 
    146       return true;
    147     }finally {
    148       lock.writeLock().unlock();
    149     }
    150 
    151   }
    152 
    153 
    154   /**
    155    * whether this node is contained
    156    * 检查node的map中是否存在这个节点
    157    *
    158    * @param node node 节点
    159    * @return true if contains 返回true,如果存在的话
    160    */
    161   public boolean containsNode(Node node) {
    162     lock.readLock().lock();
    163 
    164     try{
    165       return nodesMap.containsKey(node);
    166     }finally {
    167       lock.readLock().unlock();
    168     }
    169   }
    170 
    171 
    172   /**
    173    * whether this edge is contained
    174    * 检查是否存在边
    175    *
    176    * @param fromNode node of origin 起始点
    177    * @param toNode   node of destination 目标点
    178    * @return true if contains 返回true如果存在的话
    179    */
    180   public boolean containsEdge(Node fromNode, Node toNode) {
    181     lock.readLock().lock();
    182     try{
    183       Map<Node, EdgeInfo> endEdges = edgesMap.get(fromNode);
    184       if (endEdges == null) {
    185         return false;
    186       }
    187 
    188      return endEdges.containsKey(toNode);
    189     }finally {
    190       lock.readLock().unlock();
    191     }
    192   }
    193 
    194 
    195   /**
    196    * get node description
    197    * 获取node的nodeInfo
    198    *
    199    * @param node node 要查询的node
    200    * @return node description 返回节点的描述信息
    201    */
    202   public NodeInfo getNode(Node node) {
    203     lock.readLock().lock();
    204 
    205     try{
    206       return nodesMap.get(node);
    207     }finally {
    208       lock.readLock().unlock();
    209     }
    210   }
    211 
    212 
    213   /**
    214    * Get the number of nodes
    215    * 获取node的数量
    216    *
    217    * @return the number of nodes 返回node的数量
    218    */
    219   public int getNodesCount() {
    220     lock.readLock().lock();
    221 
    222     try{
    223       return nodesMap.size();
    224     }finally {
    225       lock.readLock().unlock();
    226     }
    227   }
    228 
    229   /**
    230    * Get the number of edges
    231    * 获取边的数量
    232    * @return the number of edges 返回边的数量
    233    */
    234   public int getEdgesCount() {
    235     lock.readLock().lock();
    236     try{
    237       int count = 0;
    238 
    239       for (Map.Entry<Node, Map<Node, EdgeInfo>> entry : edgesMap.entrySet()) {
    240         count += entry.getValue().size();
    241       }
    242 
    243       return count;
    244     }finally {
    245       lock.readLock().unlock();
    246     }
    247   }
    248 
    249 
    250   /**
    251    * get the start node of DAG
    252    * 获取一幅图中只有出度,没有入度的节点的集合
    253    *
    254    * @return the start node of DAG 返回一幅图中只有出度,没有入度的节点的集合
    255    */
    256   public Collection<Node> getBeginNode() {
    257     lock.readLock().lock();
    258 
    259     try{
    260       return CollectionUtils.subtract(nodesMap.keySet(), reverseEdgesMap.keySet());
    261     }finally {
    262       lock.readLock().unlock();
    263     }
    264 
    265   }
    266 
    267 
    268   /**
    269    * get the end node of DAG
    270    * 获取一幅图中只有入度,没有出度的节点的集合
    271    *
    272    * @return the end node of DAG 返回一幅图中只有入度,没有出度的节点的集合
    273    */
    274   public Collection<Node> getEndNode() {
    275 
    276     lock.readLock().lock();
    277 
    278     try{
    279       return CollectionUtils.subtract(nodesMap.keySet(), edgesMap.keySet());
    280     }finally {
    281       lock.readLock().unlock();
    282     }
    283 
    284   }
    285 
    286 
    287   /**
    288    * Gets all previous nodes of the node
    289    * 传入一个node,返回所有指向这个node的node集合
    290    *
    291    * @param node node id to be calculated 传入要查询的node
    292    * @return all previous nodes of the node 返回所有指向这个node的node集合
    293    */
    294   public Set<Node> getPreviousNodes(Node node) {
    295     lock.readLock().lock();
    296 
    297     try{
    298       return getNeighborNodes(node, reverseEdgesMap);
    299     }finally {
    300       lock.readLock().unlock();
    301     }
    302   }
    303 
    304 
    305   /**
    306    * Get all subsequent nodes of the node
    307    * 传入一个node,返回所有的该节点指向的node集合
    308    *
    309    * @param node node id to be calculated 传入要查询的node的id
    310    * @return all subsequent nodes of the node 返回的该节点指向的所有node
    311    */
    312   public Set<Node> getSubsequentNodes(Node node) {
    313     lock.readLock().lock();
    314 
    315     try{
    316       return getNeighborNodes(node, edgesMap);
    317     }finally {
    318       lock.readLock().unlock();
    319     }
    320   }
    321 
    322 
    323   /**
    324    * Gets the degree of entry of the node
    325    * 计算一个节点的入度
    326    *
    327    * @param node node id 被计算的节点
    328    * @return the degree of entry of the node 节点的入度
    329    */
    330   public int getIndegree(Node node) {
    331     lock.readLock().lock();
    332 
    333     try{
    334       return getPreviousNodes(node).size();
    335     }finally {
    336       lock.readLock().unlock();
    337     }
    338   }
    339 
    340 
    341   /**
    342    * whether the graph has a ring
    343    * 判断这个图是否存在环
    344    *
    345    * @return true if has cycle, else return false.
    346    * 如果存在环,返回true。没有形成环状,返回false
    347    */
    348   public boolean hasCycle() {
    349     lock.readLock().lock();
    350     try{
    351         return !topologicalSortImpl().getKey();
    352     }finally {
    353       lock.readLock().unlock();
    354     }
    355   }
    356 
    357 
    358   /**
    359    * Only DAG has a topological sort
    360    * 返回一个DAG图的拓扑排序
    361    * @return topologically sorted results, returns false if the DAG result is a ring result
    362    * 返回一个DAG图的拓扑排序,如果这个DAG其实是环图,那么返回false,也就是非DAG图不存在拓扑排序。
    363    * @throws Exception errors
    364    */
    365   public List<Node> topologicalSort() throws Exception {
    366     lock.readLock().lock();
    367 
    368     try{
    369       Map.Entry<Boolean, List<Node>> entry = topologicalSortImpl();
    370 
    371       if (entry.getKey()) {
    372         return entry.getValue();
    373       }
    374 
    375       throw new Exception("serious error: graph has cycle ! ");
    376     }finally {
    377       lock.readLock().unlock();
    378     }
    379   }
    380 
    381 
    382   /**
    383    *  if tho node does not exist,add this node
    384    * 如果节点不存在,则添加该节点
    385    * @param node    node 节点id
    386    * @param nodeInfo node information 节点描述信息
    387    */
    388   private void addNodeIfAbsent(Node node, NodeInfo nodeInfo) {
    389     if (!containsNode(node)) {
    390       addNode(node, nodeInfo);
    391     }
    392   }
    393 
    394 
    395   /**
    396    * add edge
    397    * 添加边
    398    *
    399    * @param fromNode node of origin 起始点
    400    * @param toNode   node of destination 目标点
    401    * @param edge  edge description 边的描述信息
    402    * @param edges edge set 所有边的集合
    403    */
    404   private void addEdge(Node fromNode, Node toNode, EdgeInfo edge, Map<Node, Map<Node, EdgeInfo>> edges) {
    405     edges.putIfAbsent(fromNode, new HashMap<>());
    406     Map<Node, EdgeInfo> toNodeEdges = edges.get(fromNode);
    407     toNodeEdges.put(toNode, edge);
    408   }
    409 
    410 
    411   /**
    412    * Whether an edge can be successfully added(fromNode -> toNode)
    413    * 判断是否新的 边 可以被成功添加进现有的图中(起始点-->目标点)
    414    * need to determine whether the DAG has cycle
    415    * 需要判断是否会形成一个环状图
    416    * @param fromNode     node of origin 起始点
    417    * @param toNode       node of destination 目标点
    418    * @param createNode whether to create a node
    419    *                   如果我添加新的边的时候,起始点或者目标点不存在。
    420    *                   那么我需要看这个createNode参数,如果这里传入true,代表同时可以建立Node,如果是false,代表不可以擅自新建Node。
    421    * @return true if added
    422    * 返回true,如果节点被成功添加,否则返回false。
    423    */
    424   private boolean isLegalAddEdge(Node fromNode, Node toNode, boolean createNode) {
    425       if (fromNode.equals(toNode)) {
    426           logger.error("edge fromNode({}) can't equals toNode({})", fromNode, toNode);
    427           return false;
    428       }
    429 
    430       if (!createNode) {
    431           if (!containsNode(fromNode) || !containsNode(toNode)){
    432               logger.error("edge fromNode({}) or toNode({}) is not in vertices map", fromNode, toNode);
    433               return false;
    434           }
    435       }
    436 
    437       // Whether an edge can be successfully added(fromNode -> toNode),need to determine whether the DAG has cycle!
    438       // 判断是否新的边能被成功添加进图中(起始点-->目标点),需要判断是否会形成有环图。
    439       int verticesCount = getNodesCount();
    440 
    441       Queue<Node> queue = new LinkedList<>();
    442 
    443       queue.add(toNode);
    444 
    445       // if DAG doesn't find fromNode, it's not has cycle!
    446       // 把目标点添加到队列,然后不断循环检查,目标节点的下一个节点会不会是起始点
    447       while (!queue.isEmpty() && (--verticesCount > 0)) {
    448           Node key = queue.poll();
    449 
    450           for (Node subsequentNode : getSubsequentNodes(key)) {
    451               if (subsequentNode.equals(fromNode)) {
    452                   return false;
    453               }
    454 
    455               queue.add(subsequentNode);
    456           }
    457       }
    458 
    459       return true;
    460   }
    461 
    462 
    463   /**
    464    * Get all neighbor nodes of the node
    465    * 获取待查询节点的下一个节点的集合,一个节点可能指向多个节点
    466    *
    467    * @param node   Node id to be calculated 需要查询的node id
    468    * @param edges neighbor edge information 现有的所有边的信息
    469    * @return all neighbor nodes of the node 返回相邻的节点的集合
    470    */
    471   private Set<Node> getNeighborNodes(Node node, final Map<Node, Map<Node, EdgeInfo>> edges) {
    472     final Map<Node, EdgeInfo> neighborEdges = edges.get(node);
    473 
    474     if (neighborEdges == null) {
    475       return Collections.EMPTY_MAP.keySet();
    476     }
    477 
    478     return neighborEdges.keySet();
    479   }
    480 
    481 
    482 
    483   /**
    484    * Determine whether there are ring and topological sorting results
    485    * 确定是否有环,返回拓扑排序结果
    486    * Directed acyclic graph (DAG) has topological ordering
    487    *  有向无环有向无环图(DAG)具有拓扑序
    488    * Breadth First Search:广度优先搜索:
    489    *    1、Traversal of all the vertices in the graph, the degree of entry is 0 vertex into the queue
    490    *    1、遍历图中的所有顶点,进入队列的度为0
    491    *    2、Poll a vertex in the queue to update its adjacency (minus 1) and queue the adjacency if it is 0 after minus 1
    492    *    2、轮询队列中的一个顶点,以更新其邻接点(减一) ,如果该顶点在减一之后是0,则对其邻接点进行排队
    493    *    3、Do step 2 until the queue is empty
    494    *    3、执行步骤2,直到队列是空的
    495    * If you cannot traverse all the nodes, it means that the current graph is not a directed acyclic graph.
    496    * 如果你不能遍历所有的节点,这意味着当前的图不是一个有向无环图。
    497    * There is no topological sort.
    498    * 没有拓扑排序。
    499    *
    500    *
    501    * @return key Returns the state
    502    * if success (acyclic) is true, failure (acyclic) is looped,
    503    * and value (possibly one of the topological sort results)
    504    * 如果成功(非循环)为 true,返回true和拓扑排序序列。如果是循环图,则返回false。
    505    */
    506   private Map.Entry<Boolean, List<Node>> topologicalSortImpl() {
    507     // node queue with degree of entry 0
    508     //入度为0的所有节点
    509     Queue<Node> zeroIndegreeNodeQueue = new LinkedList<>();
    510     // save result
    511     //保存结果
    512     List<Node> topoResultList = new ArrayList<>();
    513     // save the node whose degree is not 0
    514     //保存所有的入度不为0的节点
    515     Map<Node, Integer> notZeroIndegreeNodeMap = new HashMap<>();
    516 
    517     // Scan all the vertices and push vertexs with an entry degree of 0 to queue
    518     //扫描所有顶点并将入度为0的节点推入队列
    519     for (Map.Entry<Node, NodeInfo> vertices : nodesMap.entrySet()) {
    520       Node node = vertices.getKey();
    521       int inDegree = getIndegree(node);
    522 
    523       if (inDegree == 0) {
    524         zeroIndegreeNodeQueue.add(node);
    525         topoResultList.add(node);
    526       } else {
    527         notZeroIndegreeNodeMap.put(node, inDegree);
    528       }
    529     }
    530 
    531     /**
    532      * After scanning, there is no node with 0 degree of entry,
    533      * 如果这幅图连一个入度为0的节点都没有的话,说明整个图就是一个环了
    534      * indicating that there is a ring, and return directly
    535      * 这种情况 就直接返回false就好了
    536      */
    537     if(zeroIndegreeNodeQueue.isEmpty()){
    538       return new AbstractMap.SimpleEntry(false, topoResultList);
    539     }
    540 
    541     // The topology algorithm is used to delete nodes with 0 degree of entry and its associated edges
    542     //拓扑算法用于删除具有0入度的节点及其关联边
    543     while (!zeroIndegreeNodeQueue.isEmpty()) {
    544       Node v = zeroIndegreeNodeQueue.poll();
    545       // Get the neighbor node
    546       //获取相邻节点
    547       Set<Node> subsequentNodes = getSubsequentNodes(v);
    548 
    549       for (Node subsequentNode : subsequentNodes) {
    550 
    551         Integer degree = notZeroIndegreeNodeMap.get(subsequentNode);
    552 
    553         if(--degree == 0){
    554           topoResultList.add(subsequentNode);
    555           zeroIndegreeNodeQueue.add(subsequentNode);
    556           notZeroIndegreeNodeMap.remove(subsequentNode);
    557         }else{
    558           notZeroIndegreeNodeMap.put(subsequentNode, degree);
    559         }
    560 
    561       }
    562     }
    563 
    564     // if notZeroIndegreeNodeMap is empty,there is no ring!
    565     //如果非0入度的Map是空的,说明没有环,返回拓扑排序
    566     AbstractMap.SimpleEntry resultMap = new AbstractMap.SimpleEntry(notZeroIndegreeNodeMap.size() == 0 , topoResultList);
    567     return resultMap;
    568 
    569   }
    570 
    571 }
  • 相关阅读:
    webApp 开发技术要点总结
    前端好难
    webApp前端开发技巧总结
    WAP、触屏版网站及APP的区别
    ajax 200 4 parseerror 的错误
    .Net Windows Service(服务) 调试安装及System.Timers.Timer 使用
    引用:WebAPI中的定时处理-使用Quartz.Net
    Redis 应该是存放的数据超出了范围
    easyui Dialog 去边框
    MSSQL SQL Server代理 作业 设置(调用存储过程)
  • 原文地址:https://www.cnblogs.com/lukairui/p/12522576.html
Copyright © 2011-2022 走看看