zoukankan      html  css  js  c++  java
  • 在airflow的BashOperator中执行docker容器中的脚本容易忽略的问题

    dag模板

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators import ExternalTaskSensor
    from airflow.operators import EmailOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 11, 20,12,0,0),
        'retries': 3,
        'retryDelay': timedelta(seconds=5),
        'end_date': datetime(9999, 12, 31)
    }
    
    dag = DAG('time_my',
        default_args=default_args,
        schedule_interval='0 0 * * *')
    
    time_my_task_1 = BashOperator(
        task_id='time_my_task_1',
        dag=dag,
    bash_command='set -e;docker exec -it testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py " '
    )
    time_my_task_1

    在调度的时候日志报这样的错误

    [2019-12-05 09:21:41,561] {models.py:1359} INFO - Dependencies all met for <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [queued]>
    [2019-12-05 09:21:41,563] {models.py:1359} INFO - Dependencies all met for <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [queued]>
    [2019-12-05 09:21:41,564] {models.py:1571} INFO - 
    --------------------------------------------------------------------------------
    Starting attempt 1 of 4
    --------------------------------------------------------------------------------
    
    [2019-12-05 09:21:41,574] {models.py:1593} INFO - Executing <Task(BashOperator): time_my_task_1> on 2019-12-05T09:21:39.294890+08:00
    [2019-12-05 09:21:41,574] {base_task_runner.py:118} INFO - Running: ['bash', '-c', u'airflow run time_my time_my_task_1 2019-12-05T09:21:39.294890+08:00 --job_id 53877 --raw -sd DAGS_FOLDER/time_my.py --cfg_path /tmp/tmprAeiWr']
    [2019-12-05 09:21:41,999] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.5) or chardet (3.0.4) doesn't match a supported version!
    [2019-12-05 09:21:42,000] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   RequestsDependencyWarning)
    [2019-12-05 09:21:42,026] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,026] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=155292
    [2019-12-05 09:21:42,462] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,462] {__init__.py:51} INFO - Using executor LocalExecutor
    [2019-12-05 09:21:42,561] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {configuration.py:255} WARNING - section/key [rest_api_plugin/log_loading] not found in config
    [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {rest_api_plugin.py:54} WARNING - Initializing [rest_api_plugin/LOG_LOADING] with default value = False
    [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {configuration.py:255} WARNING - section/key [rest_api_plugin/filter_loading_messages_in_cli_response] not found in config
    [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {rest_api_plugin.py:54} WARNING - Initializing [rest_api_plugin/FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE] with default value = True
    [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {configuration.py:255} WARNING - section/key [rest_api_plugin/rest_api_plugin_http_token_header_name] not found in config
    [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {rest_api_plugin.py:44} WARNING - Initializing [rest_api_plugin/REST_API_PLUGIN_HTTP_TOKEN_HEADER_NAME] with default value = rest_api_plugin_http_token
    [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {configuration.py:255} WARNING - section/key [rest_api_plugin/rest_api_plugin_expected_http_token] not found in config
    [2019-12-05 09:21:42,564] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {rest_api_plugin.py:44} WARNING - Initializing [rest_api_plugin/REST_API_PLUGIN_EXPECTED_HTTP_TOKEN] with default value = None
    [2019-12-05 09:21:42,672] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,672] {models.py:273} INFO - Filling up the DagBag from /usr/local/airflow/dags/time_my.py
    [2019-12-05 09:21:42,752] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/airflow/utils/helpers.py:356: DeprecationWarning: Importing 'ExternalTaskSensor' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
    [2019-12-05 09:21:42,752] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   DeprecationWarning)
    [2019-12-05 09:21:42,753] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/airflow/utils/helpers.py:356: DeprecationWarning: Importing 'EmailOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0.
    [2019-12-05 09:21:42,753] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   DeprecationWarning)
    [2019-12-05 09:21:42,783] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,783] {cli.py:520} INFO - Running <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [running]> on host node5
    [2019-12-05 09:21:42,802] {bash_operator.py:77} INFO - Tmp dir root location: 
     /tmp
    [2019-12-05 09:21:42,802] {bash_operator.py:86} INFO - Exporting the following env vars:
    AIRFLOW_CTX_TASK_ID=time_my_task_1
    AIRFLOW_CTX_DAG_ID=time_my
    AIRFLOW_CTX_EXECUTION_DATE=2019-12-05T09:21:39.294890+08:00
    AIRFLOW_CTX_DAG_RUN_ID=manual__2019-12-05T09:21:39.294890+08:00
    [2019-12-05 09:21:42,803] {bash_operator.py:100} INFO - Temporary script location: /tmp/airflowtmpoAfSER/time_my_task_1biUSjF
    [2019-12-05 09:21:42,803] {bash_operator.py:110} INFO - Running command: set -e;docker exec -it testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py" 
    [2019-12-05 09:21:42,810] {bash_operator.py:119} INFO - Output:
    [2019-12-05 09:21:42,857] {bash_operator.py:123} INFO - the input device is not a TTY
    [2019-12-05 09:21:42,859] {bash_operator.py:127} INFO - Command exited with return code 1
    [2019-12-05 09:21:42,868] {models.py:1788} ERROR - Bash command failed
    Traceback (most recent call last):
      File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1657, in _run_raw_task
        result = task_copy.execute(context=context)
      File "/usr/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 131, in execute
        raise AirflowException("Bash command failed")
    AirflowException: Bash command failed
    [2019-12-05 09:21:42,871] {models.py:1811} INFO - Marking task as UP_FOR_RETRY
    [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 Traceback (most recent call last):
    [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/bin/airflow", line 32, in <module>
    [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     args.func(args)
    [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
    [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     return f(*args, **kwargs)
    [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 526, in run
    [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     _run(args, dag, ti)
    [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 445, in _run
    [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     pool=args.pool,
    [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in wrapper
    [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     return func(*args, **kwargs)
    [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1657, in _run_raw_task
    [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     result = task_copy.execute(context=context)
    [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1   File "/usr/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 131, in execute
    [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1     raise AirflowException("Bash command failed")
    [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 airflow.exceptions.AirflowException: Bash command failed
    [2019-12-05 09:21:46,561] {logging_mixin.py:95} INFO - [2019-12-05 09:21:46,561] {jobs.py:2527} INFO - Task exited with return code 1

    其实问题就出在这

    用定时任务执行docker命令的脚本的时候报错如上标题,tty(终端设备的统称): tty一词源于Teletypes,或teletypewriters。

    这个的意思是说后台linux执行的时候没有终端设备。我们一般执行docker里的命令时候都喜欢加上-it 这个参数,这里的-it 就是表示终端设备。

    所以,如果我们docker执行后台运行的任务或者程序直接去除 -it 这个参数就不会出现这个报错了!

    修改后的DAG模板

    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators import ExternalTaskSensor
    from airflow.operators import EmailOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2019, 11, 20,12,0,0),
        'retries': 3,
        'retryDelay': timedelta(seconds=5),
        'end_date': datetime(9999, 12, 31)
    }
    
    dag = DAG('time_my',
        default_args=default_args,
        schedule_interval='0 0 * * *')
    
    time_my_task_1 = BashOperator(
        task_id='time_my_task_1',
        dag=dag,
    bash_command='set -e;docker exec -i testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py " '
    )
    time_my_task_1

    再次调度就不会出现错误了。

  • 相关阅读:
    P5331 [SNOI2019]通信
    P3700 [CQOI2017]小Q的表格
    Linux
    P3268 [JLOI2016]圆的异或并
    P3317 [SDOI2014]重建
    P5492 [PKUWC2018]随机算法
    P3210 [HNOI2010]取石头游戏
    支配树
    P5401 [CTS2019]珍珠
    P4027 [NOI2007]货币兑换
  • 原文地址:https://www.cnblogs.com/braveym/p/11987632.html
Copyright © 2011-2022 走看看