zoukankan      html  css  js  c++  java
  • apache airflow docker 运行简单试用

    airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源,现在在Apache Software Foundation 孵化。
    airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,
    airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警
    系统

    测试运行环境使用docker

    基本安装

    • docker安装
    使用别人已经构建好的 puckel/docker-airflow
    • 或者使用pip 安装
    pip install apache-airflow

    简单测试&&运行

    • docker-compose

    local 运行:

    version: '2.1'
    services:
        postgres:
            image: postgres:9.6
            environment:
                - POSTGRES_USER=airflow
                - POSTGRES_PASSWORD=airflow
                - POSTGRES_DB=airflow
            ports:
            - "5432:5432"
    
        webserver:
            image: puckel/docker-airflow:1.10.0-2
            depends_on:
                - postgres
            environment:
                - LOAD_EX=n
                - EXECUTOR=Local
            volumes:
                - ./dags:/usr/local/airflow/dags
                # Uncomment to include custom plugins
                # - ./plugins:/usr/local/airflow/plugins
            ports:
                - "8080:8080"
            command: webserver
            healthcheck:
                test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
                interval: 30s
                timeout: 30s
                retries: 3
    
    Celery 运行:
    version: '2.1'
    services:
        redis:
            image: 'redis:3.2.7'
            # command: redis-server --requirepass redispass
    
        postgres:
            image: postgres:9.6
            environment:
                - POSTGRES_USER=airflow
                - POSTGRES_PASSWORD=airflow
                - POSTGRES_DB=airflow
            # Uncomment these lines to persist data on the local filesystem.
            # - PGDATA=/var/lib/postgresql/data/pgdata
            # volumes:
            # - ./pgdata:/var/lib/postgresql/data/pgdata
    
        webserver:
            image: puckel/docker-airflow:1.10.0-2
            restart: always
            depends_on:
                - postgres
                - redis
            environment:
                - LOAD_EX=n
                - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
                - EXECUTOR=Celery
                # - POSTGRES_USER=airflow
                # - POSTGRES_PASSWORD=airflow
                # - POSTGRES_DB=airflow
                # - REDIS_PASSWORD=redispass
            volumes:
                - ./dags:/usr/local/airflow/dags
                # Uncomment to include custom plugins
                # - ./plugins:/usr/local/airflow/plugins
            ports:
                - "8080:8080"
            command: webserver
            healthcheck:
                test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
                interval: 30s
                timeout: 30s
                retries: 3
    
        flower:
            image: puckel/docker-airflow:1.10.0-2
            restart: always
            depends_on:
                - redis
            environment:
                - EXECUTOR=Celery
                # - REDIS_PASSWORD=redispass
            ports:
                - "5555:5555"
            command: flower
    
        scheduler:
            image: puckel/docker-airflow:1.10.0-2
            restart: always
            depends_on:
                - webserver
            volumes:
                - ./dags:/usr/local/airflow/dags
                # Uncomment to include custom plugins
                # - ./plugins:/usr/local/airflow/plugins
            environment:
                - LOAD_EX=n
                - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
                - EXECUTOR=Celery
                # - POSTGRES_USER=airflow
                # - POSTGRES_PASSWORD=airflow
                # - POSTGRES_DB=airflow
                # - REDIS_PASSWORD=redispass
            command: scheduler
    
        worker:
            image: puckel/docker-airflow:1.10.0-2
            restart: always
            depends_on:
                - scheduler
            volumes:
                - ./dags:/usr/local/airflow/dags
                # Uncomment to include custom plugins
                # - ./plugins:/usr/local/airflow/plugins
            environment:
                - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
                - EXECUTOR=Celery
                # - POSTGRES_USER=airflow
                # - POSTGRES_PASSWORD=airflow
                # - POSTGRES_DB=airflow
                # - REDIS_PASSWORD=redispass
            command: worker
    • 简单flow
    """
    Code that goes along with the Airflow located at:
    http://airflow.readthedocs.org/en/latest/tutorial.html
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2015, 6, 1),
        "email": ["airflow@airflow.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1))
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
    
    t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
    
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id="templated",
        bash_command=templated_command,
        params={"my_param": "Parameter I passed in"},
        dag=dag,
    )
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)

    说明

    任务的运行是从2015 6.1 开始,运行次数有点多可以进行修改

    运行

    • 效果



    参考资料

    https://www.jianshu.com/p/76794553effc
    https://hub.docker.com/r/puckel/docker-airflow/
    https://github.com/rongfengliang/airflow-docker-compose-demo

  • 相关阅读:
    centos7下如何使用udev配置asm磁盘
    ORA-29786: SIHA attribute GET failed with error [Attribute 'SPFILE' sts[200]
    安装grid时报INS-40404错误
    clickhouse编译安装
    centos7通过rc.local文件添加自启动服务
    Error in invoking target 'agent nmhs' of makefile
    打补丁(18370031)
    2020 HFCTF
    2020省赛决赛
    2020西湖论剑
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/9608674.html
Copyright © 2011-2022 走看看