zoukankan      html  css  js  c++  java
  • conductor Kitchensink示例

    一个示例的厨房工作流程,演示了所有模式构造的使用。

    定义

    {
      "name": "kitchensink",
      "description": "kitchensink workflow",
      "version": 1,
      "tasks": [
        {
          "name": "task_1",
          "taskReferenceName": "task_1",
          "inputParameters": {
            "mod": "${workflow.input.mod}",
            "oddEven": "${workflow.input.oddEven}"
          },
          "type": "SIMPLE"
        },
        {
          "name": "event_task",
          "taskReferenceName": "event_0",
          "inputParameters": {
            "mod": "${workflow.input.mod}",
            "oddEven": "${workflow.input.oddEven}"
          },
          "type": "EVENT",
          "sink": "conductor"
        },
        {
          "name": "dyntask",
          "taskReferenceName": "task_2",
          "inputParameters": {
            "taskToExecute": "${workflow.input.task2Name}"
          },
          "type": "DYNAMIC",
          "dynamicTaskNameParam": "taskToExecute"
        },
        {
          "name": "oddEvenDecision",
          "taskReferenceName": "oddEvenDecision",
          "inputParameters": {
            "oddEven": "${task_2.output.oddEven}"
          },
          "type": "DECISION",
          "caseValueParam": "oddEven",
          "decisionCases": {
            "0": [
              {
                "name": "task_4",
                "taskReferenceName": "task_4",
                "inputParameters": {
                  "mod": "${task_2.output.mod}",
                  "oddEven": "${task_2.output.oddEven}"
                },
                "type": "SIMPLE"
              },
              {
                "name": "dynamic_fanout",
                "taskReferenceName": "fanout1",
                "inputParameters": {
                  "dynamicTasks": "${task_4.output.dynamicTasks}",
                  "input": "${task_4.output.inputs}"
                },
                "type": "FORK_JOIN_DYNAMIC",
                "dynamicForkTasksParam": "dynamicTasks",
                "dynamicForkTasksInputParamName": "input"
              },
              {
                "name": "dynamic_join",
                "taskReferenceName": "join1",
                "type": "JOIN"
              }
            ],
            "1": [
              {
                "name": "fork_join",
                "taskReferenceName": "forkx",
                "type": "FORK_JOIN",
                "forkTasks": [
                  [
                    {
                      "name": "task_10",
                      "taskReferenceName": "task_10",
                      "type": "SIMPLE"
                    },
                    {
                      "name": "sub_workflow_x",
                      "taskReferenceName": "wf3",
                      "inputParameters": {
                        "mod": "${task_1.output.mod}",
                        "oddEven": "${task_1.output.oddEven}"
                      },
                      "type": "SUB_WORKFLOW",
                      "subWorkflowParam": {
                        "name": "sub_flow_1",
                        "version": 1
                      }
                    }
                  ],
                  [
                    {
                      "name": "task_11",
                      "taskReferenceName": "task_11",
                      "type": "SIMPLE"
                    },
                    {
                      "name": "sub_workflow_x",
                      "taskReferenceName": "wf4",
                      "inputParameters": {
                        "mod": "${task_1.output.mod}",
                        "oddEven": "${task_1.output.oddEven}"
                      },
                      "type": "SUB_WORKFLOW",
                      "subWorkflowParam": {
                        "name": "sub_flow_1",
                        "version": 1
                      }
                    }
                  ]
                ]
              },
              {
                "name": "join",
                "taskReferenceName": "join2",
                "type": "JOIN",
                "joinOn": [
                  "wf3",
                  "wf4"
                ]
              }
            ]
          }
        },
        {
          "name": "search_elasticsearch",
          "taskReferenceName": "get_es_1",
          "inputParameters": {
            "http_request": {
              "uri": "http://localhost:9200/conductor/_search?size=10",
              "method": "GET"
            }
          },
          "type": "HTTP"
        },
        {
          "name": "task_30",
          "taskReferenceName": "task_30",
          "inputParameters": {
            "statuses": "${get_es_1.output..status}",
            "workflowIds": "${get_es_1.output..workflowId}"
          },
          "type": "SIMPLE"
        }
      ],
      "outputParameters": {
        "statues": "${get_es_1.output..status}",
        "workflowIds": "${get_es_1.output..workflowId}"
      },
      "schemaVersion": 2
    }

    Visual Flow

    运行Kitchensink工作流程

    1. 在这里记录启动服务器-DloadSample=true启动服务器时使用java系统属性。这将创建一个kitchensink工作流程,相关的任务定义,并启动一个厨房工作流程的实例。
    2. 一旦工作流开始,第一个任务就保持在SCHEDULED状态。这是因为没有工作人员正在轮询任务。
    3. 我们将直接使用REST端点轮询任务并更新状态。

    启动工作流执行

    通过发布以下内容开始执行kitchensink工作流:

    curl -X POST --header 'Content-Type: application/json' --header 'Accept: text/plain' 'http://localhost:8080/api/workflow/kitchensink' -d ' { "task2Name": "task_5" } '

    响应是标识工作流实例ID的文本字符串。

    poll第一个任务:

    curl http://localhost:8080/api/tasks/poll/task_1

    响应应该如下所示:

    {
        "taskType": "task_1",
        "status": "IN_PROGRESS",
        "inputData": {
            "mod": null,
            "oddEven": null
        },
        "referenceTaskName": "task_1",
        "retryCount": 0,
        "seq": 1,
        "pollCount": 1,
        "taskDefName": "task_1",
        "scheduledTime": 1486580932471,
        "startTime": 1486580933869,
        "endTime": 0,
        "updateTime": 1486580933902,
        "startDelayInSeconds": 0,
        "retried": false,
        "callbackFromWorker": true,
        "responseTimeoutSeconds": 3600,
        "workflowInstanceId": "b0d1a935-3d74-46fd-92b2-0ca1e388659f",
        "taskId": "b9eea7dd-3fbd-46b9-a9ff-b00279459476",
        "callbackAfterSeconds": 0,
        "polledTime": 1486580933902,
        "queueWaitTime": 1398
    }

    更新任务状态

    • 注意轮询响应的值taskIdworkflowInstanceId字段
    • 更新任务的状态COMPLETED如下:
    curl -H 'Content-Type:application/json' -H 'Accept:application/json' -X POST http://localhost:8080/api/tasks/ -d '
    {
        "taskId": "b9eea7dd-3fbd-46b9-a9ff-b00279459476",
        "workflowInstanceId": "b0d1a935-3d74-46fd-92b2-0ca1e388659f",
        "status": "COMPLETED",
        "output": {
            "mod": 5,
            "taskToExecute": "task_1",
            "oddEven": 0,
            "dynamicTasks": [
                {
                    "name": "task_1",
                    "taskReferenceName": "task_1_1",
                    "type": "SIMPLE"
                },
                {
                    "name": "sub_workflow_4",
                    "taskReferenceName": "wf_dyn",
                    "type": "SUB_WORKFLOW",
                    "subWorkflowParam": {
                        "name": "sub_flow_1"
                    }
                }
            ],
            "inputs": {
                "task_1_1": {},
                "wf_dyn": {}
            }
        }
    }'

    这将将task_1标记为已完成,并task_5作为下一个任务计划
    对后续计划的任务重复相同的过程,直到完成。

    Conductor提供Java中的客户端库(一个Python客户端是工作),以简化任务轮询和执行。

  • 相关阅读:
    linux三剑客之sed
    线程与循环的区别?
    Notify和NotifyAll的区别?
    no system images installed for this target这个问题如何解决?
    Intent里ACTION的CALL和DIAL的区别?
    onConfigurationChanged方法的使用
    String和StringBuffer的区别?
    Activity的状态保存
    C#将datatable数据转换成JSON数据的方法
    SQL语句:关于复制表结构和内容到另一张表中的SQL语句
  • 原文地址:https://www.cnblogs.com/mhc-fly/p/7011300.html
Copyright © 2011-2022 走看看