zoukankan      html  css  js  c++  java
  • airflow + CeleryExecutor 环境搭建

    airflow整合环境搭建

    1. 整体结构

    mysql -> 后端数据库

    redis -> 用于broker

    CeleryExecutor -> 执行器

    2. 环境安装

    2.1,安装python anaconda环境
    添加py用户
    # useradd py
    设置密码
    # passwd py
    创建anaconda安装路径
    # mkdir /anaconda
    赋权
    # chown -R py:py /anaconda
    
    上传anaconda安装包并用py用户运行安装程序
    $ chmod +x Anaconda3-5.1.0-Linux-x86_64.sh
    $ ./Anaconda3-5.1.0-Linux-x86_64.sh
    Welcome to Anaconda3 5.1.0
    In order to continue the installation process, please review the license
    ......
    - Press ENTER to confirm the location
    - Press CTRL-C to abort the installation
    - Or specify a different location below
    [/home/py/anaconda3] >>> /anaconda/anaconda3      输入自定义安装路径,如果用默认的话回车跳过
    然后将anaconda加入环境变量,并使其生效
    
    $ vi .bash_profile 
    在最后一行加上如下配置:
    export PATH=/anaconda/anaconda3/bin:$PATH
    然后使其生效:
    $source .bash_profile
    
    检测一下安装结果,出现以下结果说明安装成功:
    $ python -V
    Python 3.6.4 :: Anaconda, Inc.
    
    配置pipy源:
    $ mkdir ~/.pip
    $ touch ~/.pip/pip.conf
    $ echo '[global]' >> ~/.pip/pip.conf
    $ echo 'trusted-host=mirrors.aliyun.com' >> ~/.pip/pip.conf
    $ echo 'index-url=http://mirrors.aliyun.com/pypi/simple/' >> ~/.pip/pip.conf
    
    2.2,安装mysql相关依赖
    去mysql官网下载mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar安装包并上传至服务器:
    # tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar
    检查服务器是否有旧版本依赖:
    # rpm -qa|grep mysql-libs-5.1.73|wc -l
    如果结果大于0则执行如下命令卸载旧依赖:
    # rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
    如果等于0则不需要此操作.
    然后依次安装以下依赖:
    # rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm
    # rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm
    # rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm
    
    2.3,安装airflow相关模块
    $ pip install apache-airflow[celery]
    $ pip install apache-airflow[redis]
    $ pip install apache-airflow[mysql]
    检测一下安装结果:
    $ airflow -h
    如果显示正常则表示安装成功,并且用户根目录会出现airflow文件夹
    
    2.4,安装mysql
    上传mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar至需要安装mysql的服务器上:
    # tar xvf mysql-5.7.22-1.el6.x86_64.rpm-bundle.tar
    检查服务器是否有旧版本依赖:
    # rpm -qa|grep mysql-libs-5.1.73|wc -l
    如果结果大于0则执行如下命令卸载旧依赖:
    # rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
    如果等于0则不需要此操作.
    然后依次安装以下安装:
    # rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm 
    # rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm 
    # rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm 
    # rpm -ivh mysql-community-client-5.7.22-1.el6.x86_64.rpm
    # rpm -ivh mysql-community-server-5.7.22-1.el6.x86_64.rpm 
    
    # vi /etc/my.cnf
    尾部添加:
    #关闭TIMESTAMP列自动生成值
    explicit_defaults_for_timestamp=1
    #跳过权限认证
    skip-grant-tables
    
    # service mysqld start
    # mysql -u root
    用于测试话剧所以密码设置的比较简单,仅供测试:
    mysql> use mysql
    mysql> update user set password_expired='N' where user='root';    
    mysql> update user set authentication_string=password('123456') where user='root'; 
    
    编辑/etc/my.cnf去掉skip-grant-tables 并重启mysql:
    # service mysqld restart
    # mysql -u root -p
    使用密码123456登陆
    #降低密码复杂度要求仅仅用于测试
    mysql> set global validate_password_policy=0; 
    mysql> set global validate_password_length=4;
    mysql> SET PASSWORD = PASSWORD('123456');
    mysql> flush privileges;
    针对airflow使用创建数据库,添加用户并授权
    mysql> CREATE DATABASE airflow;
    mysql> CREATE USER 'af'@'localhost' IDENTIFIED BY '123456';
    mysql> GRANT all privileges on airflow.* TO 'af'@'localhost' IDENTIFIED BY '123456';
    mysql> GRANT all privileges on airflow.* TO 'af'@'%' IDENTIFIED BY '123456';
    mysql> flush privileges;
    账户测试:
    # mysql -u af -p
    使用123456登陆
    
    2.5,安装redis(使用redis作为broker)[方案一]
    • 安装redis:
    redis官网下载redis-4.0.9.tar.gz安装包,并上传至需要安装redis的服务器:
    $ tar zxvf redis-4.0.9.tar.gz
    $ cd redis-4.0.9
    $ make
    $ cp redis.conf src/
    $ cd src
    编辑配置文件redis.conf将bind属性改为bind 0.0.0.0
    启动redis
    $ nohup ./redis-server redis.conf > output.log 2>&1 &
    
    • airflow对应配置:
    如果执行过airflow -h命令后,则用户目录下面会出现一个airflow文件夹, airflow文件夹下面有个airflow.cfg
    的文件,这个就是airflow的配置文件;
    编辑airflow.cfg文件,修改一下内容,具体情况根据实际情况填写[ip和端口]:
    [core] 
    #sql_alchemy_conn = mysql://[username]:[password]@[host]:[port]/airflow
    sql_alchemy_conn = mysql://af:123456@localhost/airflow
    executor = CeleryExecutor 
    [celery] 
    broker_url = redis://localhost:6379/0
    celery_result_backend =  redis://localhost:6379/0
    
    配置完成之后即可进行数据库初始化:
    $ airflow initdb
    
    2.6,安装rabbitmq(使用rabbitmq作为broker)[方案二]
    • yum安装:
    安装centos扩展源
    # yum -y install epel-release
    安装erlang运行环境以及rabbitmq
    # yum install erlang
    # yum install rabbitmq-server
    
    • rpm手动安装:

    一般yum源安装的erlang版本太低,可以从erlang官网下载打包好的rpm包手动安装,避免源码编译安装:

    # wget https://packages.erlang-solutions.com/erlang/esl-erlang/FLAVOUR_1_general/esl-erlang_21.0-1~centos~6_amd64.rpm
    
    # rpm -ivh esl-erlang_21.0-1~centos~6_amd64.rpm
    
    # wget http://www.rabbitmq.com/releases/erlang/esl-erlang-compat-18.1-1.noarch.rpm
    
    # rpm -ivh esl-erlang-compat-18.1-1.noarch.rpm
    
    • rabbitmq配置:

    添加用户并开启远程访问:

    相关格式如下:

    #下面创建用户密码的过程以该url为示例
    broker_url = 'pyamqp://myuser:mypassword@localhost:5672/myvhost'
    

    以下过程为创建用户myuser设置密码为mypassword 添加一个virtual host并允许用户访问该virtual host

    $ sudo rabbitmqctl add_user myuser mypassword
    
    $ sudo rabbitmqctl add_vhost myvhost
    
    $ sudo rabbitmqctl set_user_tags myuser mytag
    
    $ sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
    

    示例:

    $ rabbitmqctl add_user cord 123456
    $ rabbitmqctl set_user_tags cord administrator  #这里指定为管理员用户
    $ rabbitmqctl set_permissions -p / cord ".*" ".*" ".*"
    
    • airflow对应配置:

    先安装rabbitmq模块:

    $ pip install apache-airflow[rabbitmq]
    

    然后修改配置文件:

    编辑airflow.cfg文件,修改一下内容,具体情况根据实际情况填写[ip和端口]:
    [core] 
    #sql_alchemy_conn = mysql://[username]:[password]@[host]:[port]/airflow
    sql_alchemy_conn = mysql://af:123456@localhost/airflow
    executor = CeleryExecutor 
    [celery] 
    注意:这里使用pyamqp协议,而不是amqp协议,amqp使用的是librabbitmq2.0.0,这个和celery4.x集成有各种问题
    broker_url = pyamqp://cord:123456@localhost:5672//
    #celery_result_backend = rpc:// 使用rabbitmq作为结果存储
    #这里是使用mysql作为结果存储
    celery_result_backend = db+mysql://af:123456@localhost/airflow
    
    配置完成之后即可进行数据库初始化:
    $ airflow initdb
    
    2.7,启动airflow
    关于启动,这个要分应用节点和作业节点:
    1) 应用节点:
    $ airflow webserver -D
    $ airflow scheduler -D
    $ airflow worker -D (应用节点可不运行woker)
    2) 作业节点:(作业节点只需要运行worker就行)
    $ airflow worker -D
    

    3.增加定时任务

    1. 在airflow文件夹下面新建dags文件夹用于存储定时任务文件
    2. 创建如下的定时任务文件helloworld.py
    from datetime import timedelta, datetime
    
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.dummy_operator import DummyOperator
    
    default_args = {
        'owner': 'jifeng.si',
        'depends_on_past': False,
        # 'depends_on_past': True,
        #'start_date': airflow.utils.dates.days_ago(2),
        'start_date': datetime(2018, 5, 2),
        'email': ['1219957063@qq.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'example_hello_world_dag',
        default_args=default_args,
        description='my first DAG',
        schedule_interval='*/25 * * * *',
        start_date=datetime(2018, 5, 28)
    )
    
    dummy_operator = DummyOperator(task_id='dummy_task', dag=dag)
    
    hello_operator = BashOperator(
        task_id='sleep_task',
        depends_on_past=False,
        bash_command='echo `date` >> /home/py/test.txt',
        dag=dag
    )
    
    dummy_operator >> hello_operator
    
    1. 测试一下代码的正确性:
    $ python helloworld.py
    

    如果没出现异常则说明代码无错误, 并且airflow环境正常.

    将helloworld.py 放在 /home/py/airflow/dags下.

    测试一下任务看任务是否能正常运行:

    $ touch ~/test.txt 创建用于测试的文件
    $ airflow run -A example_hello_world_dag sleep_task 20180528
    

    如果运行正常,则可以启用该定时任务,启用任务有两种方式:

    1. 通过命令启动:
    $ airflow unpause example_hello_world_dag
    
    1. 通过界面启动:
      在airflow的web管理界面,将左边的off按钮改为on

    然后观察用户路径下的test.txt文件,如果运行正常的话会不断增加时间信息:

    $ cat test.txt
    ....
    Thu May 31 15:55:10 CST 2018
    Thu May 31 15:56:10 CST 2018
    Thu May 31 15:57:09 CST 2018
    Thu May 31 16:04:10 CST 2018
    ....
    

    4.注意事项

    • airflow的cron定时器只能精确到分钟,而不能精确到秒
    • airflow使用的是utc时区,正式使用的时候需要进行时区转换

    附录:

    附上一个环境初始化shell脚本:

    #!/bin/sh
    
    #拷贝mysql依赖
    #scp命令必须手工输入密码确认过一次之后才可保证sshpass能正常运行
    sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-devel-5.7.22-1.el6.x86_64.rpm  /root/  &&
    sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-libs-5.7.22-1.el6.x86_64.rpm  /root/  &&
    sshpass -p '123456' scp -q root@127.0.0.1:/root/mysql-community-common-5.7.22-1.el6.x86_64.rpm  /root/ &&
    
    #添加py用户
    password="py@123"
    username="py"
    pass=$(perl -e 'print crypt($ARGV[0], "password")' $password)
    useradd -m -p $pass $username  &&
    
    #添加anaconda安装路径
    mkdir -p /anaconda  &&
    chown -R py:py /anaconda &&
    
    #设置pipy源信息
    su - py -c  "mkdir ~/.pip && touch ~/.pip/pip.conf"
    su - py -c  "echo '[global]' >> ~/.pip/pip.conf"
    su - py -c  "echo 'trusted-host=pypi.douban.com/simple' >> ~/.pip/pip.conf"
    su - py -c  "echo 'index-url=http://pypi.douban.com/simple' >> ~/.pip/pip.conf"
    
    #安装mysql依赖
    old=$(rpm -qa|grep mysql-libs-5.1.73|wc -l)
    if [ $old -gt 0 ]; then
        rpm -e --nodeps mysql-libs-5.1.73-5.el6_6.x86_64
    fi
    rpm -ivh mysql-community-common-5.7.22-1.el6.x86_64.rpm  &&
    rpm -ivh mysql-community-libs-5.7.22-1.el6.x86_64.rpm &&
    rpm -ivh mysql-community-devel-5.7.22-1.el6.x86_64.rpm &&
    

    参考链接:

    http://doc.okbase.net/permike/archive/245749.html

    http://bubuko.com/infodetail-2284634.html

    http://celery.readthedocs.io/en/latest/userguide/configuration.html#conf-rpc-result-backend

  • 相关阅读:
    element表单中一个elformitem下多个formitem项校验(循环校验)
    vscode配置
    git push时提示错误 sign_and_send_pubkey: no mutual signature supported
    syncthing文件同步网盘配置
    MQTT服务搭建和简单使用
    python脚本避免被多次执行
    Hadoop集群对datanode宕机后的处理机制源码阅读
    工作冲突和低谷
    职场的帮助
    测试总体思想
  • 原文地址:https://www.cnblogs.com/cord/p/9226608.html
Copyright © 2011-2022 走看看