zoukankan      html  css  js  c++  java
  • Ubuntu16.04安装apache-airflow

    服务器使用的是centos系统,需要安装好pip和setuptools,同时注意更新安装的版本

    接下来参考安装好Airflow

    Airflow 1.8 工作流平台搭建 http://blog.csdn.net/kk185800961/article/details/78431484
    airflow最简安装方法 centos 6.5 http://blog.csdn.net/Excaliburace/article/details/53818530
    

     

    以mysql作为数据库,airflow默认使用sqlite作为数据库

    1.建表

    # 创建相关数据库及账号  
    mysql> create database airflow default charset utf8 collate utf8_general_ci;  
    mysql> create user airflow@'localhost' identified by 'airflow';  
    mysql> grant all on airflow.* to airflow@'localhost';  
    mysql> flush privileges;  
    

    2.安装airflow,需要环境隔离的时候请使用virtualenv ./env创建隔离环境

    sudo pip install apache-airflow
    

    3.使用pip来安装,安装的路径在python路径下site-packages文件夹,在使用上述命令一遍就能看到

    ~/anaconda2/lib/python2.7/site-packages/airflow
    

    4.在/etc/proofile中添加,之后source一下

    #Airflow
    export AIRFLOW_HOME=/home/lintong/software/airflow
    

    5.创建数据库,这一步会创建上面路径的airflow文件夹,以及文件夹中的一些文件

    airflow initdb
    

     查看是否安装成功

    airflow version
    

    5.配置元数据库地址

    /home/lintong/software/airflow
    vim airflow.cfg
    

    修改下图中的配置

    sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
    

    6.安装python的mysql驱动

    pip install mysql-python
    

    再次初始化数据库

    airflow initdb
    

    7.启动web界面

    airflow webserver -p 8080
    http://localhost:8080/admin/
    

    8.在airflow路径下新建一个dags文件夹,并创建一个DAG python脚本,参考:[AirFlow]AirFlow使用指南三 第一个DAG示例

    # -*- coding: utf-8 -*-
    
    import airflow
    import os
    import time
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import timedelta
    
    # -------------------------------------------------------------------------------
    # these args will get passed on to each operator
    # you can override them on a per-task basis during operator initialization
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': airflow.utils.dates.days_ago(0),
        'email': ['xxxxxxx'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'dag': dag,
        # 'adhoc':False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'trigger_rule': u'all_success'
    }
    
    # -------------------------------------------------------------------------------
    # dag
    
    dag = DAG(
        'example_print_dag',
        default_args=default_args,
        description='my first DAG',
        schedule_interval=timedelta(days=1))
    
    
    def each_content(content, path):
        return ("echo "" + content + " " + time.asctime(time.localtime(time.time())) + "" >> " + path)
    
    # -------------------------------------------------------------------------------
    # first operator
    
    #print1_operator = PythonOperator(
    #    task_id='print1_task',
    #    python_callable=each_content("1", "/home/lintong/桌面/test.txt"),
    #    dag=dag)
    
    print1_operator = BashOperator(
        task_id='print1_task',
        bash_command='echo 1 >> /home/lintong/test.txt',
        dag=dag)
    
    # -------------------------------------------------------------------------------
    # second operator
    
    print2_operator = BashOperator(
        task_id='print2_task',
        bash_command='echo 2 >> /home/lintong/test.txt',
        dag=dag)
    
    # -------------------------------------------------------------------------------
    # third operator
    
    print3_operator = BashOperator(
        task_id='print3_task',
        bash_command='echo 3 >> /home/lintong/test.txt',
        dag=dag)
    
    # -------------------------------------------------------------------------------
    # dependencies
    #each_content("1", "/home/lintong/桌面/test.txt")
    print2_operator.set_upstream(print1_operator)
    print3_operator.set_upstream(print1_operator)
    
    #if __name__ == "__main__":
    #    dag.cli()
    

    9.在web界面中查看是否出现这个DAG

    10.出现的时候,DAG的状态是off,需要将起状态设置为on,并点击后面的 绿色三角形 启动按钮

    11.启动调度器

    airflow scheduler
    

    12.查看文件test.txt,其中会顺序出现1 2 3或者1 3 2

    安装完成后使用下面shell脚本来启动Airflow,端口为8080

    #!/bin/bash
    
    #set -x
    #set -e
    set -u
    
    # 使用./start_airflow.sh "stop_all"或者"start_all"
    if [ $1 == "stop_all" ]; then
        # 获取所有进程,取出Airflow,去除grep,截取PID,干掉
        ps -ef | grep -Ei 'airflow' | grep -v 'grep' | awk '{print $2}' | xargs kill
    fi
    
    if [ $1 == "start_all" ]; then
        cd /home/lintong/software/airflow/logs
        nohup airflow webserver >>webserver.log 2>&1 &
        nohup airflow worker >>worker.log 2>&1 &
        nohup airflow scheduler >>scheduler.log 2>&1 &
        echo "后台启动Airflow"
    fi
    

     添加用户的python脚本

    from airflow import models,   settings
    from airflow.contrib.auth.backends.password_auth import PasswordUser
    user = PasswordUser(models.User())
    user.username = 'lintong'
    user.email = 'xxxxx@gmail.com'
    user.password = 'XXXXX'
    session = settings.Session()
    session.add(user)
    session.commit()
    session.close()
    exit()
    

    如果要安装1.10版本的,请使用python3.6

    如果pip3找不到了

    sudo python3 -m pip install --upgrade --force-reinstall pip
    

    如果虚拟环境中不存在pip3

    sudo apt-get install python3-venv
    

    安装Python3.6

    sudo add-apt-repository ppa:deadsnakes/ppa
    sudo apt-get update
    sudo apt-get install python3.6
    

    安装pip3.6

    wget https://bootstrap.pypa.io/get-pip.py
    sudo python3.6 get-pip.py
    

    安装python-dev,不然会报error: command 'x86_64-linux-gnu-gcc' failed with exit status 1

    sudo apt-get install python3.6-dev
    

    添加AIRFLOW_HOME

    #Airflow
    export AIRFLOW_HOME=/home/lintong/software/airflow
    

    安装airflow 1.10.10

    sudo pip3.6 install --force-reinstall apache-airflow -i https://pypi.tuna.tsinghua.edu.cn/simple
    

    airflow version,然后airflow_home目录下会自动出现airflow.cfg文件,修改airflow.cfg

    sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
    

    安装mysqlclient,因为python3没有mysql-python

    pip3.6 install mysqlclient
    

    初始化db

    airflow initdb
    
  • 相关阅读:
    电脑设置开机
    python 环境搭建 python-3.4.4
    遍历hashmap 的四种方法
    Java8 使用 stream().map()提取List对象的某一列值及排重
    解决 SpringMVC 非spring管理的工具类使用@Autowired注解注入DAO为null的问题
    CXF之"@XmlType.name 和 @XmlType.namespace 为类分配不同的名称"错误
    java.lang.NoSuchMethodError: javax.wsdl.xml.WSDLReader.readWSDL
    java.lang.IllegalArgumentException: URLDecoder: Illegal hex characters in escape (%) pattern
    java.lang.IllegalArgumentException: Request header is too large
    ie8 报错:意外地调用了方法或属性访问
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/7911022.html
Copyright © 2011-2022 走看看