zoukankan      html  css  js  c++  java
  • airflow trigger a DAG run with REST API

    REST API

    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Overview

    为了利于管理, 支持了REST API。

    To facilitate management, Apache Airflow supports a range of REST API endpoints across its objects. This section provides an overview of the API design, methods, and supported use cases.

    Most of the endpoints accept JSON as input and return JSON responses. This means that you must usually add the following headers to your request:

    Content-type: application/json
    Accept: application/json

    Open Authentication

    https://airflow.apache.org/docs/apache-airflow/stable/security/api.html#basic-authentication

    默认API是关闭的, 需要调整为 鉴权认证 模式。

    Basic authentication

    Basic username password authentication is currently supported for the API. This works for users created through LDAP login or within Airflow Metadata DB using password.

    To enable basic authentication, set the following in the configuration:

    [api]
    auth_backend = airflow.api.auth.backend.basic_auth
    

    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#section/Authentication

    默认为阻止任何API

    To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method.

    If you want to check which auth backend is currently set, you can use airflow config get-value api auth_backend command as in the example below.

    $ airflow config get-value api auth_backend
    airflow.api.auth.backend.basic_auth

    The default is to deny all requests.

    For details on configuring the authentication, see API Authorization.

    Trigger a new DAG run

    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/post_dag_run

    Trigger a new DAG run

    path Parameters
    dag_id
    required
    string

    The DAG ID.

    Request Body schema: application/json
    dag_run_id
    string Nullable

    Run ID.

    The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

    If not provided, a value will be generated based on execution_date.

    If the specified dag_run_id is in use, the creation request fails with an ALREADY_EXISTS error.

    This together with DAG_ID are a unique key.

    execution_date
    string <date-time>

    The execution date. This is the time when the DAG run should be started according to the DAG definition. The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error. This together with DAG_ID are a unique key.

    state
    string (DagState)
    Enum: "success" "running" "failed"

    DAG State.

    conf
    object

    JSON object describing additional configuration parameters.

    The value of this field can be set only when creating the object. If you try to modify the field of an existing object, the request fails with an BAD_REQUEST error.

    WHY not using python client?

    https://github.com/apache/airflow-client-python

    项目还在开发,有些API是坏的。

    Apache Airflow Python Client

    NOTE: The Apache Airflow Client is still under active development and some methods or APIs might be broken. Please raise an issue in github if you encounter any such issues.

    trigger接口确实是坏的, 并且测试用例大都是空的。

    https://github.com/apache/airflow-client-python/issues/21

    Requirement on DAG

    https://stackoverflow.com/questions/56480312/how-to-trigger-a-dag-to-run-immediately

    必须将DAG调度模式(scheduling_interva)定义为None

    If you want to trigger this dag manually then you need to set scheduling_interval=None and use airflow trigger_dag dag_id (Documentation : airflow trigger dag)

    如下:

    https://www.waitingforcode.com/apache-airflow/externally-triggered-dags-apache-airflow/read

    dag = DAG(
        dag_id='hello_world_a',
        default_args={
            "owner": "airflow",
            'start_date': airflow.utils.dates.days_ago(1),
        },
        schedule_interval=None
    )
    
    
    def print_hello(**kwargs):
        task_params = kwargs['dag_run'].conf['task_payload']
        print('Hello world a with {}'.format(task_params))
    
    PythonOperator(
        task_id='hello_world_printer',
        python_callable=print_hello,
        provide_context=True,
        dag=dag)

    Then deploy this DAG file

    https://stackoverflow.com/questions/49033163/airflow-publish-a-dynamically-created-dag

    此问题是动态创建dag的讨论, 正常的发布方法为, 将dag文件拷贝到 $AIRFLOW_HOME/dags 目录下, 则airflow文件会自动扫描加载dag。

    尝试过,将文件放入此目录下, DAG在数秒之内就能生成, 貌似没有必要研究动态创建的的新的方法。

     

    I want to be able to publish and trigger a DAG object from my code which is not in control of scheduler (viz. $AIRFLOW_HOME/dags folder)

    My last resort would be to programmatically create a py file containing the DAG definition that I want to publish and save this file to the $AIRFLOW_HOME/dags folder. I'm sure it should be easier than that.

    https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html

    2.0做了架构上的优化,scheduler将dag文件解析后的结果(序列化结果), 保存到 数据库中, 然后sheduler和webserver都使用数据库中的序列化结果。达到解析成果的复用。

    In order to make Airflow Webserver stateless, Airflow >=1.10.7 supports DAG Serialization and DB Persistence. From Airflow 2.0.0, the Scheduler also uses Serialized DAGs for consistency and makes scheduling decisions.

    As shown in the image above, when using this feature, the DagFileProcessorProcess in the Scheduler parses the DAG files, serializes them in JSON format and saves them in the Metadata DB as SerializedDagModel model.

    _images/dag_serialization.png

    Code Sample

    https://github.com/fanqingsong/machine_learning_workflow_on_airflow/blob/master/rest_api_call/trigger_dag.py

    触发一个dag run,然后轮询其状态,直至状态为sucess。

    import requests
    import json
    from pprint import pprint
    from datetime import datetime
    import sched, time
    
    def get_execution_time():
        # datetime object containing current date and time
        now = datetime.utcnow()
        
        print("now =", now)
    
        dt_string = now.strftime("%Y-%m-%dT%H:%M:%SZ")
        print("date and time =", dt_string)    
    
        return dt_string
    
    dag_id = "kmeans_with_workflow"
    
    def trigger_dag():
        exec_time = get_execution_time()
    
        data = {
            # "dag_run_id": dag_run_id,
            "execution_date": exec_time,
            # "execution_date": None,
            # "state": None,
            "conf": { }
        }
    
        header = {"content-type": "application/json"}
    
        result = requests.post(
        f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns",
        data=json.dumps(data),
        headers=header,
        auth=("admin", "admin"))
    
        pprint(result.content.decode('utf-8'))
    
        result = json.loads(result.content.decode('utf-8'))
    
        pprint(result)
    
        return result
    
    
    def get_dag_run(dag_run_id):
        result = requests.get(
        f"http://localhost:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
        auth=("admin", "admin"))
    
        pprint(result.content.decode('utf-8'))
    
    
        result = json.loads(result.content.decode('utf-8'))
    
        pprint(result)
    
        return result
    
    result = trigger_dag()
    dag_run_id = result["dag_run_id"]
    
    s = sched.scheduler(time.time, time.sleep)
    
    def watch_dag_until_complete():
        result = get_dag_run(dag_run_id)
        state = result["state"]
    
        if state != "success":
            s.enter(1, 1, watch_dag_until_complete)
        else:
            print("dag completed!")
    
    s.enter(1, 1, watch_dag_until_complete)
    s.run()

    Time zones

    https://airflow.apache.org/docs/apache-airflow/stable/timezone.html

    上面的例子中, execution_time 为 UTC时间(本初子午线时间),经过查阅资料,发现其内部设计如此,这样可以解除时区的依赖。

    Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC. Also templates used in Operators are not converted. Time zone information is exposed and it is up to the writer of DAG what do with it.

    schedule_interval

    https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#running-dags

    除了手动和API触发,

    DAG还支持定时触发, 使用的语法和  Crontab一致。

    DAGs will run in one of two ways:

    • When they are triggered either manually or via the API

    • On a defined schedule, which is defined as part of the DAG

    DAGs do not require a schedule, but it's very common to define one. You define it via the schedule_interval argument, like this:

    with DAG("my_daily_dag", schedule_interval="@daily"):
        ...
    

    The schedule_interval argument takes any value that is a valid Crontab schedule value, so you could also do:

    with DAG("my_daily_dag", schedule_interval="0 * * * *"):
        ...
    

    Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a DAG Run. DAG Runs can run in parallel for the same DAG, and each has a defined execution_date, which identifies the logical date and time it is running for - not the actual time when it was started.

    https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

    输入三种类型:

    #1 crontab格式

    #2 datetime.timedelta 多长时间间隔执行一次

    #3 presets 预设值

    或者None

    A DAG Run is an object representing an instantiation of the DAG in time.

    Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG argument, which can be passed a cron expression as a str, a datetime.timedelta object, or one of the following cron "presets".

    Cron Presets

    preset

    meaning

    cron

    None

    Don't schedule, use for exclusively "externally triggered" DAGs

     

    @once

    Schedule once and only once

     

    @hourly

    Run once an hour at the beginning of the hour

    0 * * * *

    @daily

    Run once a day at midnight

    0 0 * * *

    @weekly

    Run once a week at midnight on Sunday morning

    0 0 * * 0

    @monthly

    Run once a month at midnight of the first day of the month

    0 0 1 * *

    @quarterly

    Run once a quarter at midnight on the first day

    0 0 1 */3 *

    @yearly

    Run once a year at midnight of January 1

    0 0 1 1 *

    Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend.

    timedelta

    https://github.com/apache/airflow/issues/14969

    import datetime as dt
    
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    
    dag_params = {
        'dag_id': 'schedule_interval_bug_example_dag',
        'default_args':{
            'owner': 'Administrator',
            'depends_on_past': False,
            'retries': 0,
            'email': ['example@example.com']
        },
        'schedule_interval': dt.timedelta(days=1),
        'start_date': dt.datetime(year=2021, month=1, day=1, hour=11, minute=10),
        'catchup': False
    }
    
    with DAG(**dag_params) as dag:
        DummyOperator(task_id='start') >> DummyOperator(task_id='end')

    OpenAPI?

    此API是遵守 openapi 规范的。

    这是什么东东?

    https://spec.openapis.org/oas/v3.1.0

    What is the OpenAPI Specification?

    The OpenAPI Specification (OAS) defines a standard, programming language-agnostic interface description for HTTP APIs, which allows both humans and computers to discover and understand the capabilities of a service without requiring access to source code, additional documentation, or inspection of network traffic. When properly defined via OpenAPI, a consumer can understand and interact with the remote service with a minimal amount of implementation logic. Similar to what interface descriptions have done for lower-level programming, the OpenAPI Specification removes guesswork in calling a service.

    https://oai.github.io/Documentation/start-here.html

    Advantages of Using OpenAPI

    Having your API formally described in a machine-readable format allows automated tools to process it, instantly opening the door to:

    • Description Validation and Linting: Check that your description file is syntactically correct and adheres to a specific version of the Specification and the rest of your team’s formatting guidelines.
    • Data Validation: Check that the data flowing through your API (in both directions) is correct, during development and once deployed.
    • Documentation Generation: Create traditional human-readable documentation based on the machine-readable description, which always stays up-to-date.
    • Code Generation: Create both server and client code in any programming language, freeing developers from having to perform data validation or write SDK glue code, for example.
    • Graphical Editors: Allow easy creation of description files using a GUI instead of typing them by hand.
    • Mock Servers: Create fake servers providing example responses which you and your customers can start testing with before you write a single line of code.
    • Security Analysis: Discover possible vulnerabilities at the API design stage instead of much, much later.

    swagger vs openapi

    https://swagger.io/blog/api-strategy/difference-between-swagger-and-openapi/

    Let's start with clarifying Swagger vs OpenAPI

    The easiest way to understand the difference is:

    • OpenAPI = Specification
    • Swagger = Tools for implementing the specification

    The OpenAPI is the official name of the specification. The development of the specification is fostered by the OpenAPI Initiative, which involves more the 30 organizations from different areas of the tech world — including Microsoft, Google, IBM, and CapitalOne. Smartbear Software, which is the company that leads the development of the Swagger tools, is also a member of the OpenAPI Initiative, helping lead the evolution of the specification.

    出处:http://www.cnblogs.com/lightsong/ 本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
  • 相关阅读:
    左式堆
    winsock库
    二叉堆
    关键字explicit
    HDOJ 1012
    HDOJ 1013
    STL priority实例
    二项队列
    ASP.NET Session过期问题揭秘
    RenderControl (asp.net)
  • 原文地址:https://www.cnblogs.com/lightsong/p/14991297.html
Copyright © 2011-2022 走看看