zoukankan      html  css  js  c++  java
  • 4.airflow测试

    当前生产上的任务主要分为两部分:sqoop任务和hive计算任务,测试这两种任务,分别以shell文件和直接执行命令的方式来测试.
    本次测试的表是airflow.code_library.

    1.测试sqoop任务

    1.1 测试全量抽取

    1.1.1.直接执行命令

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'yangxw',
        'depends_on_past': False,
        'start_date': datetime(2017, 5, 23),
    }
    dag = DAG('sqoop4', default_args=default_args,schedule_interval=None)
    bash_cmd = '''
    sqoop import 
    --connect jdbc:oracle:thin:@//XX.XX.XX.XX/aaaa 
    --username bbbb --password 'cccc' 
    --query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUTE7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as etl_in_dt from XDGL.CODE_LIBRARY where $CONDITIONS " 
    --hcatalog-database airflow 
    --hcatalog-table CODE_LIBRARY 
    --hcatalog-storage-stanza 'stored as ORC' 
    --hive-overwrite 
    --hive-delims-replacement " " -m 1
    '''
    t1 = BashOperator(
        task_id='sqoopshell',
        bash_command=bash_cmd,
        dag=dag)

    测试成功,数据导入到表中.

    1.1.2.以shell文件方式执行sqoop或hive任务

    上述步骤虽然可以执行成功,但是如果要truncate 表,那么还要需要再增加一个task来执行truncate命令,这样一个ETL任务就要分成两个task很不方便.通过shell将truncate和import放在一起执行.
    1)创建dag

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'yangxw',
        'depends_on_past': False,
        'start_date': datetime(2017, 5, 23)
    }
    
    dag = DAG('sqoop7', default_args=default_args,schedule_interval=None)
    
    bash_cmd = 'sh /home/airflow/sqoop3.sh'
    t1 = BashOperator(
        task_id='sqoop7',
        bash_command=bash_cmd,
        dag=dag)
    

    2)创建shell文件

    hive -e "truncate table airflow.CODE_LIBRARY"
    sqoop import 
    --connect jdbc:oracle:thin:@//AAAA/BBB 
    --username CCC --password 'DDD' 
    --query " select CODENO, ITEMNO, ITEMNAME, BANKNO, SORTNO, ISINUSE, ITEMDESCRIBE, ITEMATTRIBUTE, RELATIVECODE, ATTRIBUTE1, ATTRIBUTE2, ATTRIBUTE3, ATTRIBUTE4, ATTRIBUTE5, ATTRIBUTE6, ATTRIBUT
    E7, ATTRIBUTE8, INPUTUSER, INPUTORG, INPUTTIME, UPDATEUSER, UPDATETIME, REMARK, HELPTEXT , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as etl_in_dt from XDGL.CODE_LIBRARY where $CONDITIONS " 
    --hcatalog-database airflow 
    --hcatalog-table CODE_LIBRARY 
    --hcatalog-storage-stanza 'stored as ORC' 
    --hive-overwrite 
    --hive-delims-replacement " " -m 1 

    将这些文件分发到scheduler和worker节点上,然后执行:
    查看日志会报错:

    …………
    [2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask:   File "/opt/anaconda2/lib/python2.7/site-packages/jinja2/loaders.py", line 187, in get_source
    [2017-05-24 10:55:52,853] {base_task_runner.py:95} INFO - Subtask:     raise TemplateNotFound(template)
    [2017-05-24 10:55:52,854] {base_task_runner.py:95} INFO - Subtask: jinja2.exceptions.TemplateNotFound: sh /home/airflow/sqoop3.sh

    这是airflow的一个bug,默认会使用jinja2的语法来解析task.

    bash_cmd = 'sh /home/airflow/sqoop3.sh' 修改为
    bash_cmd = '{{"sh /home/airflow/sqoop3.sh"}}' 即可

    测试成功.或者使用:

    bash_cmd = '''
    sh /home/airflow/sqoop3.sh
    '''

    也可以执行成功.

    1.2 测试增量抽取

    新建个dag,sqoop8.

    dag = DAG('sqoop8', default_args=default_args,schedule_interval=None)
    
    bash_cmd = '''
    sh /home/airflow/sqoop4.sh %s
    ''' % '2017-05-24'
    
    t1 = BashOperator(
        task_id='sqoop8',
        bash_command=bash_cmd,
        dag=dag)

    创建shell:

    hive -e "alter table airflow.ACCT_FEE_ARCH drop partition(p_day='$1');"
    sqoop import --connect jdbc:oracle:thin:@//AAA/BBB --username CCC --password 'DDD' 
    --query " select SERIALNO,  
    ……
    to_char(SYNCHDATE, 'YYYY-MM-DD HH24:mi:ss') as SYNCHDATE , to_char(SysDate,'YYYY-MM-DD HH24:mi:ss') as ETL_IN_DT 
    from XDGL.ACCT_FEE_ARCH 
    where SYNCHDATE < (TO_DATE('$1', 'YYYY-MM-DD') +1) and SYNCHDATE >= (TO_DATE('$1', 'YYYY-MM-DD')) and $CONDITIONS " 
    --hcatalog-database airflow 
    --hcatalog-table ACCT_FEE_ARCH 
    --hcatalog-storage-stanza 'stored as ORC' 
    --hive-partition-key p_day --hive-partition-value $1 
     --hive-delims-replacement " " -m 1

    2.测试hive任务

    上面以shell方式执行了hive truncate任务,下面以命令的方式执行sql文件.
    创建sqoop9:

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    from airflow.models import Variable
    
    default_args = {
        'owner': 'yangxw',
        'depends_on_past': False,
        'start_date': datetime(2017, 5, 23)
    }
    
    dag = DAG('hivesh2', default_args=default_args,schedule_interval=None)
    str1 = Variable.get("str1")
    bash_cmd = '''
    hive -f "/home/airflow/hive1.sql"  -hivevar tbname=%s
    ''' % str1
    
    t1 = BashOperator(
        task_id='hivesh2',
        bash_command=bash_cmd,
        dag=dag)

    创建hive sql文件:

    insert overwrite table airflow.tab_cnt select '${tbname}',  count(*) from ${tbname}

    在页面上创建变量 str1=airflow.ACCT_FEE_ARCH
    执行成功.

    3.总结

    1.如果执行shell,一定要用jinja2语法或者''' ''':
    bash_cmd = '{{" sh /home/airflow/sqoop1.sh"}}' 或者
    bash_cmd = '''
    sh /home/airflow/sqoop1.sh
    '''

    2.所有的文件必须复制到所有节点
    python文件shell文件sql文件,必须复制到所有的webserver scheduler worker节点

    3.有时候使用python命令编译不出来pyc文件,在页面上只能看到dag名称,不能看到代码及调度等.这时使用
    python -m py_compile XXX.py 来编译

    4.airflow的dag一旦创建就无法删除,错误的或者多余的dag可以设置为pause模式并隐藏.

    5.shell的方式适合执行sqoop任务,可以将truncate tabledrop partition和import一步执行完成,不用起两个task来执行.命令的方式适合执行hive 任务,通过hive -f XXX.sql --hivevar a=%s b=%s的方式,动态的传递参数给hive.





  • 相关阅读:
    [Swift通天遁地]一、超级工具-(7)创建一个图文并茂的笔记本程序
    [Swift通天遁地]一、超级工具-(6)通过JavaScript(脚本)代码调用设备的源生程序
    [Swift通天遁地]一、超级工具-(5)使用UIWebView(网页视图)加载本地页面并调用JavaScript(脚本)代码
    [Swift通天遁地]一、超级工具-(4)使用UIWebView(网页视图)加载HTML和Gif动画
    [Swift通天遁地]一、超级工具-(3)带切换图标的密码文本框
    [Swift通天遁地]一、超级工具-(2)制作美观大方的环形进度条
    将css 中的16进制颜色, 转化为 rgb格式
    Axure实现Tab选项卡切换功能
    UVA 10951
    No architectures to compile for (ONLY_ACTIVE_ARCH=YES, active arch=x86_64, VALID_ARCHS=i386).错误解决方法
  • 原文地址:https://www.cnblogs.com/skyrim/p/7456171.html
Copyright © 2011-2022 走看看