zoukankan      html  css  js  c++  java
  • conductor 系统任务

    动态任务

    参数:

    dynamicTaskNameParam:来自任务输入的参数的名称,其值用于调度任务。 例如 如果参数的值为ABC,则调度的下一个任务类型为“ABC”。

    Example

    {
      "name": "user_task",
      "taskReferenceName": "t1",
      "inputParameters": {
        "files": "${workflow.input.files}",
        "taskToExecute": "${workflow.input.user_supplied_task}"
      },
      "type": "DYNAMIC",
      "dynamicTaskNameParam": "taskToExecute"
    }
    
    如果以user_supplied_task的值(假设值为user_task_2)来启动工作流,那么Conductor将在调度此动态任务时调度user_task_2。

    决策任务:

    决策任务与编程语言中的case ... switch语句相似。 该任务需要3个参数:
    caseValueParam: 其值将用作开关的任务输入中的参数名称。
    decisionCases: 映射其中key是caseValueParam的可能值,值是要执行的任务列表。
    defaultCase:在决策案例中找不到匹配值时执行的任务列表(默认条件)

    Example

    {
      "name": "decide_task",
      "taskReferenceName": "decide1",
      "inputParameters": {
        "case_value_param": "${workflow.input.movieType}"
      },
      "type": "DECISION",
      "caseValueParam": "case_value_param",
      "decisionCases": {
        "Show": [
          {
            "name": "setup_episodes",
            "taskReferenceName": "se1",
            "inputParameters": {
              "movieId": "${workflow.input.movieId}"
            },
            "type": "SIMPLE"
          },
          {
            "name": "generate_episode_artwork",
            "taskReferenceName": "ga",
            "inputParameters": {
              "movieId": "${workflow.input.movieId}"
            },
            "type": "SIMPLE"
          }
        ],
        "Movie": [
          {
            "name": "setup_movie",
            "taskReferenceName": "sm",
            "inputParameters": {
              "movieId": "${workflow.input.movieId}"
            },
            "type": "SIMPLE"
          },
          {
            "name": "generate_movie_artwork",
            "taskReferenceName": "gma",
            "inputParameters": {
              "movieId": "${workflow.input.movieId}"
            },
            "type": "SIMPLE"
          }
        ]
      }
    }

    Fork任务:

    fork用于调度并行的任务。
    参数:
    forkTasks:任务列表的列表。 每个子列表被安排并行执行。 然而,子列表中的任务是以串行方式安排的。

    Example

    {
      "forkTasks": [
        [
          {
            "name": "task11",
            "taskReferenceName": "t11"
          },
          {
            "name": "task12",
            "taskReferenceName": "t12"
          }
        ],
        [
          {
            "name": "task21",
            "taskReferenceName": "t21"
          },
          {
            "name": "task22",
            "taskReferenceName": "t22"
          }
        ]
      ]
    }
    
    当执行时,task11和task21被调度为同时执行。


    动态fork任务
    动态fork与FORK_JOIN任务相同。 除了在运行时使用任务的输入提供要分配的任务的列表。 当fork的任务数量不固定并且根据输入而变化时很有用。
    dynamicForkTasksParam:包含并行执行的工作流任务配置列表的参数名称
    dynamicForkTasksInputParamName: 参数的名字,此参数的值是一个映射,映射的key是fork任务的别名,value是fork任务的input

    Example

    {
      "dynamicTasks": "${taskA.output.dynamicTasksJSON}",
      "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}",
      "type": "FORK_JOIN_DYNAMIC",
      "dynamicForkTasksParam": "dynamicTasks",
      "dynamicForkTasksInputParamName": "dynamicTasksInput"
    }
    

    假设taskA的输出为:
    {
      "dynamicTasksInputJSON": {
        "forkedTask1": {
          "width": 100,
          "height": 100,
          "params": {
            "recipe": "jpg"
          }
        },
        "forkedTask2": {
          "width": 200,
          "height": 200,
          "params": {
            "recipe": "jpg"
          }
        }
      },
      "dynamicTasksJSON": [
        {
          "name": "encode_task",
          "taskReferenceName": "forkedTask1",
          "type": "SIMPLE"
        },
        {
          "name": "encode_task",
          "taskReferenceName": "forkedTask2",
          "type": "SIMPLE"
        }
      ]
    }
    执行时,动态fork任务将调度两个并行任务,类型为“encode_task”,引用名称为“forkedTask1”和“forkedTask2”,由_ dynamicTasksInputJSON_指定

    join任务:

    join任务用于等待fork任务产生的一个或多个任务的完成。
    参数:
    joinOn:任务参考名称列表,其中JOIN将等待完成。

    Example

    {
        "joinOn": ["taskRef1", "taskRef3"]
    }
    

    Join Task Output

    fork任务的输出将是一个JSON对象,其中key是任务别名,值是fork任务的输出。

    子工作流任务:

    Sub Workflow任务允许在另一个工作流中嵌套工作流。

    参数:

    subWorkflowParam:任务参考名称列表,其中JOIN将等待完成。

    Example

    {
      "name": "sub_workflow_task",
      "taskReferenceName": "sub1",
      "inputParameters": {
        "requestId": "${workflow.input.requestId}",
        "file": "${encode.output.location}"
      },
      "type": "SUB_WORKFLOW",
      "subWorkflowParam": {
        "name": "deployment_workflow",
        "version": 1
      }
    }
    
    执行时,将使用两个输入参数requestId和file执行deployment_workflow。 完成生成的工作流程后,任务将被标记为已完成。 如果子工作流程终止或失败,则将任务标记为失败,并重新配置。


    等待任务:

    等待任务被实现为保持IN_PROGRESS状态的门,除非由外部触发器标记为COMPLETED或FAILED。 要使用等待任务,请将任务类型设置为WAIT

    参数:无
    外部触发等待任务
    任务资源端点可用于将任务的状态更新为终止状态。
    Contrib模块提供SQS集成,其中外部系统可以将消息放置在服务器侦听的预配置队列中。 消息到达时,它们被标记为COMPLETED或FAILED。

    SQS队列
    可以使用以下API检索服务器用于更新任务状态的SQS队列:
    GET /queue

    当更新任务的状态时,消息需要符合以下规范:

         消息必须是有效的JSON字符串。
         消息JSON应包含一个名为externalId的密钥,其值为包含以下密钥的JSONized字符串:
             workflowId:工作流的ID
             taskRefName:应该更新的任务引用名称。
         每个队列代表一个特定的任务状态,相应地标记任务。 例如 消息进入COMPLETED队列将任务状态标记为COMPLETED。
         使用消息更新任务的输出。


    示例SQS有效负载:

    {
      "some_key": "valuex",
      "externalId": "{"taskRefName":"TASK_REFERENCE_NAME","workflowId":"WORKFLOW_ID"}"
    }


    HTTP任务:

    HTTP任务用于通过HTTP调用另一个微服务器。
    该任务期望一个名为http_request的输入参数作为任务输入的一部分,具有以下详细信息:
    uri:服务的URI。 使用vipAddress时可以是部分或包含服务器地址。
    method: HTTP method. One of the GET, PUT, POST, DELETE, OPTIONS, HEAD
    accept: Accept header as required by server.
    contentType:Content Type - 支持的类型是 text/plain, text/html and, application/json
    headers:要与请求一起发送的其他http标头的map。
    body:Request body
    vipAddress:使用基于发现的服务URL时。

    HTTP 任务输出:
    response: 包含response的JSON主体(如果存在)
    headers: Response Headers
    statusCode: Integer status code

    Example

    Task Input payload using vipAddress

    {
      "http_request": {
        "vipAddress": "examplevip-prod",
        "uri": "/",
        "method": "GET",
        "accept": "text/plain"
      }
    }
    

    Task Input using an absolute URL

    {
      "http_request": {
        "uri": "http://example.com/",
        "method": "GET",
        "accept": "text/plain"
      }
    }
    
    如果请求无法完成或远程服务器返回不成功的状态代码,任务将被标记为FAILED。


    Event任务:

    事件任务提供将事件(消息)发布到conductor或外部事件系统(如SQS)的能力。 事件任务对于为工作流和任务创建基于事件的依赖非常有用。
    参数:
    sink:生产的事件的合格名称。 例如 conductor或sqs:sqs_queue_name

    Example

    {
        "sink": 'sqs:example_sqs_queue_name'
    }
    
    当使用conductor作为接收器生成事件时,事件名称遵循以下结构:conductor:<workflow_name>:<task_reference_name>
    对于SQS,使用队列的名称,而不是URI。 conductor根据名称查找URI。

    Supported Sinks

    • Conductor
    • SQS

    事件任务输入:

    给予事件任务的输入作为有效载荷提供给发布的消息。 例如 如果将消息放入SQS队列(sink is sqs)中,则消息有效内容将作为任务的输入。

    时间任务输出:

    event_produced生成的事件的名称。


     
     


  • 相关阅读:
    java.lang.UnsupportedClassVersionError: action/Login : Unsupported major.minor version 52.0 (unable to load class action.Login)异常
    main方法和args参数
    建立maven工程pom.xml报错:web.xml is missing and <failOnMissingWebXml> is set to true
    遍历js中数组或者对象
    setAttribute设置无效
    javascript中用setAttribute给元素添加colspan属性无效
    ssm回顾笔记(一)
    struts2学习笔记(一)
    esay-ui学习笔记(一)
    农银电商项目学习笔记(一)
  • 原文地址:https://www.cnblogs.com/mhc-fly/p/7010549.html
Copyright © 2011-2022 走看看