zoukankan      html  css  js  c++  java
  • 使用AirFlow调度MaxCompute

    简介: airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

    背景

    airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

    一、环境准备

    • Python 2.7.5  PyODPS支持Python2.6以上版本
    • Airflow apache-airflow-1.10.7

    1.安装MaxCompute需要的包

    pip install setuptools>=3.0

    pip install requests>=2.4.0

    pip install greenlet>=0.4.10  # 可选,安装后能加速Tunnel上传。

    pip install cython>=0.19.0  # 可选,不建议Windows用户安装。

    pip install pyodps

    注意:如果requests包冲突,先卸载再安装对应的版本

    2.执行如下命令检查安装是否成功

    python -c "from odps import ODPS"

    二、开发步骤

    1.在Airflow家目录编写python调度脚本Airiflow_MC.py

     

    # -*- coding: UTF-8 -*-

    import sys

    import os

    from odps import ODPS

    from odps import options

    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    from datetime import datetime, timedelta

    from configparser import ConfigParser

    import time

    reload(sys)

    sys.setdefaultencoding('utf8')

    #修改系统默认编码。

    # MaxCompute参数设置

    options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}

    cfg = ConfigParser()

    cfg.read("odps.ini")

    print(cfg.items())

    odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

    default_args = {

       'owner': 'airflow',

       'depends_on_past': False,

       'retry_delay': timedelta(minutes=5),

       'start_date':datetime(2020,1,15)

       # 'email': ['airflow@example.com'],

       # 'email_on_failure': False,

       # 'email_on_retry': False,

       # 'retries': 1,

       # 'queue': 'bash_queue',

       # 'pool': 'backfill',

       # 'priority_weight': 10,

       # 'end_date': datetime(2016, 1, 1),

    }

    dag = DAG(

       'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))

    def read_sql(sqlfile):

       with io.open(sqlfile, encoding='utf-8', mode='r') as f:

           sql=f.read()

       f.closed

       return sql

    def get_time():

       print '当前时间是{}'.format(time.time())

       return time.time()

    def mc_job ():

     

       project = odps.get_project()  # 取到默认项目。

       instance=odps.run_sql("select * from long_chinese;")

       print(instance.get_logview_address())

       instance.wait_for_success()

       with instance.open_reader() as reader:

           count = reader.count

       print("查询表数据条数:{}".format(count))

       for record in reader:

           print record

       return count

    t1 = PythonOperator (

       task_id = 'get_time' ,

       provide_context = False ,

       python_callable = get_time,

       dag = dag )

     

    t2 = PythonOperator (

       task_id = 'mc_job' ,

       provide_context = False ,

       python_callable = mc_job ,

       dag = dag )

    t2.set_upstream(t1)

    2.提交

    python Airiflow_MC.py

    3.进行测试

    # print the list of active DAGs

    airflow list_dags

    # prints the list of tasks the "tutorial" dag_id

    airflow list_tasks Airiflow_MC

    # prints the hierarchy of tasks in the tutorial DAG

    airflow list_tasks Airiflow_MC --tree

    #测试task

    airflow test Airiflow_MC get_time 2010-01-16

    airflow test Airiflow_MC mc_job 2010-01-16

    4.运行调度任务

    登录到web界面点击按钮运行

    03.png

    5.查看任务运行结果

    1.点击view log

    04.png

    2.查看结果

    原文链接

    本文为阿里云原创内容,未经允许不得转载。

  • 相关阅读:
    网页返回码大全
    求数组中子数组的最大和
    什么是面向对象?面向对象与面向过程的区别?
    Java内部类
    Java拆箱装箱
    linux中su和sudo区别
    Linux 中账户管理
    解决warn appiumdoctor bin directory for $java_home is not set
    Moco之include
    Mock server 之 Moco的使用
  • 原文地址:https://www.cnblogs.com/yunqishequ/p/15122353.html
Copyright © 2011-2022 走看看