package dag import ( "errors" "fmt" "strings" "github.com/tektoncd/pipeline/pkg/list" "k8s.io/apimachinery/pkg/util/sets" ) // Task is an interface for all types that could be in a DAG // DAG 图中的节点代表的任务 type Task interface { HashKey() string Deps() []string } // Tasks is an interface for lists of types that could be in a DAG // 节点任务列表类型 type Tasks interface { Items() []Task } // Node represents a Task in a pipeline. // DAG 图中的节点,通过 Task 字段引入具体的任务 type Node struct { // Task represent the PipelineTask in Pipeline Task Task // Prev represent all the Previous task Nodes for the current Task Prev []*Node // Next represent all the Next task Nodes for the current Task Next []*Node } // Graph represents the Pipeline Graph // t通过 map[nodeName] Node 表示一个图 type Graph struct { // Nodes represent map of PipelineTask name to Node in Pipeline Graph Nodes map[string]*Node } // Returns an empty Pipeline Graph func newGraph() *Graph { return &Graph{Nodes: map[string]*Node{}} } // 在图中添加节点 func (g *Graph) addPipelineTask(t Task) (*Node, error) { if _, ok := g.Nodes[t.HashKey()]; ok { return nil, errors.New("duplicate pipeline task") } newNode := &Node{ Task: t, } g.Nodes[t.HashKey()] = newNode return newNode, nil } // Build returns a valid pipeline Graph. Returns error if the pipeline is invalid // 根据 tasks 和 依赖关系 构建一张 DAG 图 func Build(tasks Tasks, deps map[string][]string) (*Graph, error) { d := newGraph() // Add all Tasks mentioned in the `PipelineSpec` // 把每个节点先记录在 graph.Map for _, pt := range tasks.Items() { if _, err := d.addPipelineTask(pt); err != nil { return nil, fmt.Errorf("task %s is already present in Graph, can't add it again: %w", pt.HashKey(), err) } } // Process all from and runAfter constraints to add task dependency // 遍历每个节点的依赖,并连接两个节点(添加彼此到各自的 prev next) for pt, taskDeps := range deps { for _, previousTask := range taskDeps { if err := addLink(pt, previousTask, d.Nodes); err != nil { return nil, fmt.Errorf("couldn't add link between %s and %s: %w", pt, previousTask, err) } } } return d, nil } // GetSchedulable returns a set of PipelineTask names that can be scheduled, // given a list of successfully finished doneTasks. It returns tasks which have // all dependencies marked as done, and thus can be scheduled. If the specified // doneTasks are invalid (i.e. if it is indicated that a Task is done, but the // previous Tasks are not done), an error is returned. // 通过已经完成的任务列表,获取可以并发执行的下一组任务 func GetSchedulable(g *Graph, doneTasks ...string) (sets.String, error) { // 获取 DAG root 节点(没有任何依赖的节点),可能有多个 roots := getRoots(g) tm := sets.NewString(doneTasks...) d := sets.NewString() visited := sets.NewString() // 从各个 root 节点分别尝试寻找可执行任务 for _, root := range roots { schedulable := findSchedulable(root, visited, tm) for _, task := range schedulable { d.Insert(task.HashKey()) } } var visitedNames []string for v := range visited { visitedNames = append(visitedNames, v) } notVisited := list.DiffLeft(doneTasks, visitedNames) if len(notVisited) > 0 { return nil, fmt.Errorf("invalid list of done tasks; some tasks were indicated completed without ancestors being done: %v", notVisited) } return d, nil } // 连接两个节点,并判断是否存在环路 func linkPipelineTasks(prev *Node, next *Node) error { // Check for self cycle // 存在自环 if prev.Task.HashKey() == next.Task.HashKey() { return fmt.Errorf("cycle detected; task %q depends on itself", next.Task.HashKey()) } // Check if we are adding cycles. path := []string{next.Task.HashKey(), prev.Task.HashKey()} // 递归判断是否存在环路 if err := lookForNode(prev.Prev, path, next.Task.HashKey()); err != nil { return fmt.Errorf("cycle detected: %w", err) } // 添加彼此为上下游依赖 next.Prev = append(next.Prev, prev) prev.Next = append(prev.Next, next) return nil } // 递归判断是否存在环路,nodes为待添加节点的前置依赖节点列表,path为当前路径,用于表示可能存在的环路径,next为待添加节点名称 // 如果待添加节点的依赖 prev 的依赖是自己,就代表存在环路 func lookForNode(nodes []*Node, path []string, next string) error { for _, n := range nodes { path = append(path, n.Task.HashKey()) if n.Task.HashKey() == next { return errors.New(getVisitedPath(path)) } if err := lookForNode(n.Prev, path, next); err != nil { return err } } return nil } // 根据 path 获取访问路径,用于输出可能的环路 func getVisitedPath(path []string) string { // Reverse the path since we traversed the Graph using prev pointers. for i := len(path)/2 - 1; i >= 0; i-- { opp := len(path) - 1 - i path[i], path[opp] = path[opp], path[i] } return strings.Join(path, " -> ") } func addLink(pt string, previousTask string, nodes map[string]*Node) error { prev, ok := nodes[previousTask] if !ok { return fmt.Errorf("task %s depends on %s but %s wasn't present in Pipeline", pt, previousTask, previousTask) } next := nodes[pt] if err := linkPipelineTasks(prev, next); err != nil { return fmt.Errorf("couldn't create link from %s to %s: %w", prev.Task.HashKey(), next.Task.HashKey(), err) } return nil } // 如果某个节点没有依赖,那么就是 DAG 的 root func getRoots(g *Graph) []*Node { n := []*Node{} for _, node := range g.Nodes { if len(node.Prev) == 0 { n = append(n, node) } } return n } // func findSchedulable(n *Node, visited sets.String, doneTasks sets.String) []Task { // 以此节点出发的下游节点都被遍历过了 if visited.Has(n.Task.HashKey()) { return []Task{} } visited.Insert(n.Task.HashKey()) // 当前任务执行过了,就从下游节点递归遍历判断 if doneTasks.Has(n.Task.HashKey()) { schedulable := []Task{} // This one is done! Take note of it and look at the next candidate for _, next := range n.Next { if _, ok := visited[next.Task.HashKey()]; !ok { schedulable = append(schedulable, findSchedulable(next, visited, doneTasks)...) } } return schedulable } // This one isn't done! Return it if it's schedulable // 当前节点还没有执行过,判断是否可执行 if isSchedulable(doneTasks, n.Prev) { // FIXME(vdemeester) return []Task{n.Task} } // This one isn't done, but it also isn't ready to schedule return []Task{} } // 判断指定的节点是否可执行 func isSchedulable(doneTasks sets.String, prevs []*Node) bool { if len(prevs) == 0 { return true } collected := []string{} for _, n := range prevs { if doneTasks.Has(n.Task.HashKey()) { collected = append(collected, n.Task.HashKey()) } } return len(collected) == len(prevs) }