zoukankan      html  css  js  c++  java
  • airflow的安装


    官方文档文档: 
    http://airflow.incubator.apache.org/project.html

    1.环境准备

    1.1 安装环境

    • centos 6.7 (docker)
    • python 2.7.13

    docker run --name airflow -h airflow -dti --net hadoopnet --ip=172.18.0.20 -p 10131:22 -v /dfs/centos/airflow/home:/home -v /dfs/centos/airflow/opt:/opt yangxw/centos:6.7

    1.2 创建用户

    [root@airflow ~]# groupadd airflow
    [root@airflow ~]# useradd airflow -g airflow

    2.安装airflow

    2.1 安装python

    官网只有source包,所以必须编译安装。 
    参考:编译安装python2.7.13 
    由于编译python需要升级gcc,进而需要编译gcc,太复杂,因此直接下载python的集成环境Anaconda即可. 
    wegt https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/

    2.2 安装pip

    anacconda中集成了pip,直接使用即可.

    2.3 安装数据库

    airflow支持mysql postgrey oracle等。这里postgrey.使用yum install postgrey安装即可.

    2.4 安装airflow

    airflow组件可以模块化安装,用到哪个组件安装哪个组件,如: 

    2.4.1 安装主模块

    安装主模块

    [airflow@airflow ~]$ pip install airflow

    2.4.2 安装数据库模块、密码模块

    [airflow@airflow ~]$ pip install "airflow[postgres,password]"

    2.5 配置airflown

    2.5.1 设置环境变量

    先设置$AIRFLOW_HOME环境变量。首次执行airflow命令时,会在$AIRFLOW_HOME下面创建airflow的配置文件airflow.cfg。

    [airflow@airflow ~]$ vi .bashrc
    export AIRFLOW_HOME=/home/airflow/airflow01
    [airflow@airflow ~]$ airflow
    [2017-05-08 02:00:04,677] {__init__.py:57} INFO - Using executor SequentialExecutor
    usage: airflow [-h]
                   {resetdb,render,variables,connections,pause,task_failed_deps,version,trigger_dag,initdb,test,unpause,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,upgradedb}
                   …
    airflow: error: too few arguments
    [airflow@airflow ~]$ ll airflow01/
    total 16
    -rw-rw-r-- 1 airflow airflow 11418 May  8 02:00 airflow.cfg
    -rw-rw-r-- 1 airflow airflow  1549 May  8 02:00 unittests.cfg

    2.5.2 修改配置文件

    查看airflow.cfg文件,整个文件分为core、cli、api、operators、webserver、email、smtp、celery、scheduler、mesos、kerberos、github_enterprise、admin几个部分。 
    对其中一些参数做修改,其它的保持默认值即可:

    [core]
    airflow_home = /home/airflow/airflow01
    dags_folder = /home/airflow/airflow01/dags #dag python文件目录 
    executor = LocalExecutor #先使用local模式
    base_log_folder = /home/airflow/airflow01/logs #主日志目录
    sql_alchemy_conn = postgresql+psycopg2://yangxiaowen:yangxiaowen@10.38.1.78:5432/yangxiaowen
    load_examples = True
    default_impersonation = xiaowen.yang
    [webserver]
    authenticate = True
    auth_backend = airflow.contrib.auth.backends.password_auth #1.8.1版本中cfg文件没有写这个参数,一定要加上,不然会报"airflow.exceptions.AirflowException: Failed to import authentication backend"错误
    filter_by_owner = true
    web_server_host = XXX.XXX.XXX.XXX  #web server 机器IP
    base_url = http://XXX.XXX.XXX.XXX:8080  #web server 机器IP:PORT
    [smtp]
    smtp_host = smtp.exmail.qq.com
    smtp_user = bd-no-reply@bqjr.cn
    smtp_password = BQJRbd@2016
    smtp_mail_from = bd-no-reply@bqjr.cn

    3. 启动airflow

    3.1 初始化数据库

    [airflow@airflow ~]$ airflow initdb

    3.2 创建用户

    $ python
    Python 2.7.9 (default, Feb 10 2015, 03:28:08)
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import airflow
    >>> from airflow import models, settings
    >>> from airflow.contrib.auth.backends.password_auth import PasswordUser
    >>> user = PasswordUser(models.User())
    >>> user.username = 'new_user_name'
    >>> user.email = 'new_user_email@example.com'
    >>> user.password = 'set_the_password'
    >>> session = settings.Session()
    >>> session.add(user)
    >>> session.commit()
    >>> session.close()
    >>> exit()

    3.3 启动airflow

    [airflow@airflow ~]$ airflow webserver -p 8080
    
    [airflow@airflow ~]$ airflow scheduler

    如果不出错就启动成功了. 
    可以在页面上查看airflow的页面. 

    4.执行任务

    airflow中的任务都是python程序.下面创建一个简单的python程序. 
    在$AIRFLOW_HOME下创建dagslogs目录.

    vi testBashOperator.py
    #!/usr/bin/python
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'yangxw',
        'depends_on_past': False,
        'start_date': datetime(2017, 5, 9),
        'email': ['xiaowen.yang@bqjr.cn'],
        'email_on_failure': True,
        'email_on_retry': True,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG('testBashOperator', default_args=default_args)
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    
    t2.set_upstream(t1)
    
    airflow webserver --debug=True
    

    执行 python testBashOperator.py编译该文件,然后执行 airflow run testBashOperator print_date 2017-05-09 执行文件,在页面上能看到dag信息. 

    5.安装celery

    celery是一个分布式消息队列,在airflow中,使用celeryExecutor可以动态的增加worker个数并将任务在远程机器上执行.生产中建议使用celeryExecutor来执行.

    5.1 安装celery模块

    pip install airflow[celery]

    5.2 安装celery broker

    celery需要设置broker和result队列(可以用同样的)来保存消息.celery 支持多种broker: 

    5.2.1 使用RabbitMQ作为broker

    1. 安装airflow的RabbitMQ模块 
      celery可以使用RabbitMQ或者redias等做为broker,甚至可以使用一些Experimental(实验性的)工具(如sqlalchemy支持的数据库),默认使用RabbitMQ. 
      pip install airflow[rabbitmq]
    2. 安装RabbitMQ-server 
      yum install rabbitmq-server 
      (有160多个依赖包!) 
      然后启动service rabbitmq-server start
    3. 配置 rabbitmq 
      http://blog.csdn.net/qazplm12_3/article/details/53065654
    rabbitmqctl add_user ct 152108
    rabbitmqctl add_vhost ct_airflow
    rabbitmqctl set_user_tags ct airflow
    rabbitmqctl set_permissions -p ct_airflow ct ".*" ".*" ".*"

    5.2.2 使用Redis做为broker

    1. 安装celery redis模块 
      pip install -U "celery[redis]"
    2. 安装redis数据库 
      yum install redis
    3. 启动redis 
      service redis start 
      4.修改airflow配置文件 
      broker_url = redis://localhost:6379/0 
      celery_result_backend = redis://localhost:6379/0

    5.3 修改airflow配置文件启用celery

    修改airflow.cfg文件: 
    [core] 
    executor = CeleryExecutor 
    [celery] 
    broker_url = amqp://ct:152108@localhost:5672/ct_airflow 
    celery_result_backend = amqp://ct:152108@localhost:5672/ct_airflow

    5.4 测试celery

    [airflow@airflow ~]$ airflow webserver -p 8100
    [airflow@airflow ~]$ airflow scheduler
    [airflow@airflow ~]$ airflow worker  #启动celeryexcutor
    

    可以看到CeleryExecutor启动情况.再执行airflow run testBashOperator print_date 2017-05-09,看看CeleryExecutor运行情况.

    5.5 部署多个worker

    在需要运行作业的机器上的安装airflow airflow[celery] celery[redis] 模块后,启动airflow worker即可.这样作业就能运行在多个节点上.

    6. 问题

    在docker中遇到以下问题,换成实体机后解决

    [2017-05-10 09:14:59,777: ERROR/Worker-1] Command 'airflow run testFile echoDate 2017-05-10T00:00:00 --local -sd DAGS_FOLDER/testFile.py' returned non-zero exit status 1
    [2017-05-10 09:14:59,783: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[c5d5ea39-0141-46bb-b33a-06a924c07508] raised unexpected: AirflowException('Celery command failed',)
    Traceback (most recent call last):
      File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
        R = retval = fun(*args, **kwargs)
      File "/opt/anaconda2/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
        return self.run(*args, **kwargs)
      File "/opt/anaconda2/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 59, in execute_command
        raise AirflowException('Celery command failed')
    AirflowException: Celery command failed

    参考:

    http://airflow.incubator.apache.org 
    https://my.oschina.net/u/2297683/blog/751880 
    http://blog.csdn.net/qazplm12_3/article/details/53065654 
    http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html 
    http://www.rabbitmq.com/install-rpm.html 

  • 相关阅读:
    ACM ICPC 2008–2009 NEERC MSC A, B, C, G, L
    POJ 1088 滑雪 DP
    UVA 11584 最短回文串划分 DP
    POJ 2531 Network Saboteur DFS+剪枝
    UVa 10739 String to Palindrome 字符串dp
    UVa 11151 Longest Palindrome 字符串dp
    UVa 10154 Weights and Measures dp 降维
    UVa 10271 Chopsticks dp
    UVa 10617 Again Palindrome 字符串dp
    UVa 10651 Pebble Solitaire 状态压缩 dp
  • 原文地址:https://www.cnblogs.com/Mrwan/p/8274443.html
Copyright © 2011-2022 走看看