zoukankan      html  css  js  c++  java
  • Tekton DAG代码

    package dag
    
    import (
        "errors"
        "fmt"
        "strings"
    
        "github.com/tektoncd/pipeline/pkg/list"
        "k8s.io/apimachinery/pkg/util/sets"
    )
    
    // Task is an interface for all types that could be in a DAG
    // DAG 图中的节点代表的任务
    type Task interface {
        HashKey() string
        Deps() []string
    }
    
    // Tasks is an interface for lists of types that could be in a DAG
    // 节点任务列表类型
    type Tasks interface {
        Items() []Task
    }
    
    // Node represents a Task in a pipeline.
    // DAG 图中的节点,通过 Task 字段引入具体的任务
    type Node struct {
        // Task represent the PipelineTask in Pipeline
        Task Task
        // Prev represent all the Previous task Nodes for the current Task
        Prev []*Node
        // Next represent all the Next task Nodes for the current Task
        Next []*Node
    }
    
    // Graph represents the Pipeline Graph
    // t通过 map[nodeName] Node 表示一个图
    type Graph struct {
        // Nodes represent map of PipelineTask name to Node in Pipeline Graph
        Nodes map[string]*Node
    }
    
    // Returns an empty Pipeline Graph
    func newGraph() *Graph {
        return &Graph{Nodes: map[string]*Node{}}
    }
    
    // 在图中添加节点
    func (g *Graph) addPipelineTask(t Task) (*Node, error) {
        if _, ok := g.Nodes[t.HashKey()]; ok {
            return nil, errors.New("duplicate pipeline task")
        }
        newNode := &Node{
            Task: t,
        }
        g.Nodes[t.HashKey()] = newNode
        return newNode, nil
    }
    
    // Build returns a valid pipeline Graph. Returns error if the pipeline is invalid
    // 根据 tasks 和 依赖关系 构建一张 DAG 图
    func Build(tasks Tasks, deps map[string][]string) (*Graph, error) {
        d := newGraph()
    
        // Add all Tasks mentioned in the `PipelineSpec`
        // 把每个节点先记录在 graph.Map
        for _, pt := range tasks.Items() {
            if _, err := d.addPipelineTask(pt); err != nil {
                return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err)
            }
        }
    
        // Process all from and runAfter constraints to add task dependency
        // 遍历每个节点的依赖,并连接两个节点(添加彼此到各自的 prev next)
        for pt, taskDeps := range deps {
            for _, previousTask := range taskDeps {
                if err := addLink(pt, previousTask, d.Nodes); err != nil {
                    return nil, fmt.Errorf("couldn't add link between %s and %s: %w", pt, previousTask, err)
                }
            }
        }
        return d, nil
    }
    
    // GetSchedulable returns a set of PipelineTask names that can be scheduled,
    // given a list of successfully finished doneTasks. It returns tasks which have
    // all dependencies marked as done, and thus can be scheduled. If the specified
    // doneTasks are invalid (i.e. if it is indicated that a Task is done, but the
    // previous Tasks are not done), an error is returned.
    // 通过已经完成的任务列表,获取可以并发执行的下一组任务
    func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) {
        // 获取 DAG root 节点(没有任何依赖的节点),可能有多个
        roots := getRoots(g)
        tm := sets.NewString(doneTasks...)
        d := sets.NewString()
    
        visited := sets.NewString()
        // 从各个 root 节点分别尝试寻找可执行任务
        for _, root := range roots {
            schedulable := findSchedulable(root, visited, tm)
            for _, task := range schedulable {
                d.Insert(task.HashKey())
            }
        }
    
        var visitedNames []string
        for v := range visited {
            visitedNames = append(visitedNames, v)
        }
    
        notVisited := list.DiffLeft(doneTasks, visitedNames)
        if len(notVisited) > 0 {
            return nil, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited)
        }
    
        return d, nil
    }
    
    // 连接两个节点,并判断是否存在环路
    func linkPipelineTasks(prev *Node, next *Node) error {
        // Check for self cycle
        // 存在自环
        if prev.Task.HashKey() == next.Task.HashKey() {
            return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey())
        }
        // Check if we are adding cycles.
        path := []string{next.Task.HashKey(), prev.Task.HashKey()}
        // 递归判断是否存在环路
        if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil {
            return fmt.Errorf("cycle detected: %w", err)
        }
        // 添加彼此为上下游依赖
        next.Prev = append(next.Prev, prev)
        prev.Next = append(prev.Next, next)
        return nil
    }
    
    // 递归判断是否存在环路,nodes为待添加节点的前置依赖节点列表,path为当前路径,用于表示可能存在的环路径,next为待添加节点名称
    // 如果待添加节点的依赖 prev 的依赖是自己,就代表存在环路
    func lookForNode(nodes []*Node, path []string, next string) error {
        for _, n := range nodes {
            path = append(path, n.Task.HashKey())
            if n.Task.HashKey() == next {
                return errors.New(getVisitedPath(path))
            }
            if err := lookForNode(n.Prev, path, next); err != nil {
                return err
            }
        }
        return nil
    }
    
    // 根据 path 获取访问路径,用于输出可能的环路
    func getVisitedPath(path []string) string {
        // Reverse the path since we traversed the Graph using prev pointers.
        for i := len(path)/2 - 1; i >= 0; i-- {
            opp := len(path) - 1 - i
            path[i], path[opp] = path[opp], path[i]
        }
        return strings.Join(path, " -> ")
    }
    
    func addLink(pt string, previousTask string, nodes map[string]*Node) error {
        prev, ok := nodes[previousTask]
        if !ok {
            return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask)
        }
        next := nodes[pt]
        if err := linkPipelineTasks(prev, next); err != nil {
            return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err)
        }
        return nil
    }
    
    // 如果某个节点没有依赖,那么就是 DAG 的 root
    func getRoots(g *Graph) []*Node {
        n := []*Node{}
        for _, node := range g.Nodes {
            if len(node.Prev) == 0 {
                n = append(n, node)
            }
        }
        return n
    }
    
    //
    func findSchedulable(n *Node, visited sets.String, doneTasks sets.String) []Task {
        // 以此节点出发的下游节点都被遍历过了
        if visited.Has(n.Task.HashKey()) {
            return []Task{}
        }
        visited.Insert(n.Task.HashKey())
        // 当前任务执行过了,就从下游节点递归遍历判断
        if doneTasks.Has(n.Task.HashKey()) {
            schedulable := []Task{}
            // This one is done! Take note of it and look at the next candidate
            for _, next := range n.Next {
                if _, ok := visited[next.Task.HashKey()]; !ok {
                    schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...)
                }
            }
            return schedulable
        }
        // This one isn't done! Return it if it's schedulable
        // 当前节点还没有执行过,判断是否可执行
        if isSchedulable(doneTasks, n.Prev) {
            // FIXME(vdemeester)
            return []Task{n.Task}
        }
        // This one isn't done, but it also isn't ready to schedule
        return []Task{}
    }
    
    // 判断指定的节点是否可执行
    func isSchedulable(doneTasks sets.String, prevs []*Node) bool {
        if len(prevs) == 0 {
            return true
        }
        collected := []string{}
        for _, n := range prevs {
            if doneTasks.Has(n.Task.HashKey()) {
                collected = append(collected, n.Task.HashKey())
            }
        }
        return len(collected) == len(prevs)
    }
  • 相关阅读:
    es6数组方法
    es5数组方法
    es6中的generator函数
    async、await
    CSS---文本属性及其属性值
    MySQL---查询某个字段内容中存不存在某个数据,与like不同(FIND_IN_SET(str,strlist))
    CSS---background属性及其属性值
    TP---时间查询(当日、本周、本月、本年)
    PHP---各种输出详解
    type=“text”只能输入数字!
  • 原文地址:https://www.cnblogs.com/orchidzjl/p/15765562.html
Copyright © 2011-2022 走看看