dag模板
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators import ExternalTaskSensor from airflow.operators import EmailOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 11, 20,12,0,0), 'retries': 3, 'retryDelay': timedelta(seconds=5), 'end_date': datetime(9999, 12, 31) } dag = DAG('time_my', default_args=default_args, schedule_interval='0 0 * * *') time_my_task_1 = BashOperator( task_id='time_my_task_1', dag=dag, bash_command='set -e;docker exec -it testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py " ' ) time_my_task_1
在调度的时候日志报这样的错误
[2019-12-05 09:21:41,561] {models.py:1359} INFO - Dependencies all met for <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [queued]> [2019-12-05 09:21:41,563] {models.py:1359} INFO - Dependencies all met for <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [queued]> [2019-12-05 09:21:41,564] {models.py:1571} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 4 -------------------------------------------------------------------------------- [2019-12-05 09:21:41,574] {models.py:1593} INFO - Executing <Task(BashOperator): time_my_task_1> on 2019-12-05T09:21:39.294890+08:00 [2019-12-05 09:21:41,574] {base_task_runner.py:118} INFO - Running: ['bash', '-c', u'airflow run time_my time_my_task_1 2019-12-05T09:21:39.294890+08:00 --job_id 53877 --raw -sd DAGS_FOLDER/time_my.py --cfg_path /tmp/tmprAeiWr'] [2019-12-05 09:21:41,999] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.5) or chardet (3.0.4) doesn't match a supported version! [2019-12-05 09:21:42,000] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 RequestsDependencyWarning) [2019-12-05 09:21:42,026] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,026] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=155292 [2019-12-05 09:21:42,462] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,462] {__init__.py:51} INFO - Using executor LocalExecutor [2019-12-05 09:21:42,561] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {configuration.py:255} WARNING - section/key [rest_api_plugin/log_loading] not found in config [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {rest_api_plugin.py:54} WARNING - Initializing [rest_api_plugin/LOG_LOADING] with default value = False [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {configuration.py:255} WARNING - section/key [rest_api_plugin/filter_loading_messages_in_cli_response] not found in config [2019-12-05 09:21:42,562] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,561] {rest_api_plugin.py:54} WARNING - Initializing [rest_api_plugin/FILTER_LOADING_MESSAGES_IN_CLI_RESPONSE] with default value = True [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {configuration.py:255} WARNING - section/key [rest_api_plugin/rest_api_plugin_http_token_header_name] not found in config [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {rest_api_plugin.py:44} WARNING - Initializing [rest_api_plugin/REST_API_PLUGIN_HTTP_TOKEN_HEADER_NAME] with default value = rest_api_plugin_http_token [2019-12-05 09:21:42,563] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {configuration.py:255} WARNING - section/key [rest_api_plugin/rest_api_plugin_expected_http_token] not found in config [2019-12-05 09:21:42,564] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,562] {rest_api_plugin.py:44} WARNING - Initializing [rest_api_plugin/REST_API_PLUGIN_EXPECTED_HTTP_TOKEN] with default value = None [2019-12-05 09:21:42,672] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,672] {models.py:273} INFO - Filling up the DagBag from /usr/local/airflow/dags/time_my.py [2019-12-05 09:21:42,752] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/airflow/utils/helpers.py:356: DeprecationWarning: Importing 'ExternalTaskSensor' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0. [2019-12-05 09:21:42,752] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 DeprecationWarning) [2019-12-05 09:21:42,753] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 /usr/lib/python2.7/site-packages/airflow/utils/helpers.py:356: DeprecationWarning: Importing 'EmailOperator' directly from 'airflow.operators' has been deprecated. Please import from 'airflow.operators.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0. [2019-12-05 09:21:42,753] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 DeprecationWarning) [2019-12-05 09:21:42,783] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 [2019-12-05 09:21:42,783] {cli.py:520} INFO - Running <TaskInstance: time_my.time_my_task_1 2019-12-05T09:21:39.294890+08:00 [running]> on host node5 [2019-12-05 09:21:42,802] {bash_operator.py:77} INFO - Tmp dir root location: /tmp [2019-12-05 09:21:42,802] {bash_operator.py:86} INFO - Exporting the following env vars: AIRFLOW_CTX_TASK_ID=time_my_task_1 AIRFLOW_CTX_DAG_ID=time_my AIRFLOW_CTX_EXECUTION_DATE=2019-12-05T09:21:39.294890+08:00 AIRFLOW_CTX_DAG_RUN_ID=manual__2019-12-05T09:21:39.294890+08:00 [2019-12-05 09:21:42,803] {bash_operator.py:100} INFO - Temporary script location: /tmp/airflowtmpoAfSER/time_my_task_1biUSjF [2019-12-05 09:21:42,803] {bash_operator.py:110} INFO - Running command: set -e;docker exec -it testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py" [2019-12-05 09:21:42,810] {bash_operator.py:119} INFO - Output: [2019-12-05 09:21:42,857] {bash_operator.py:123} INFO - the input device is not a TTY [2019-12-05 09:21:42,859] {bash_operator.py:127} INFO - Command exited with return code 1 [2019-12-05 09:21:42,868] {models.py:1788} ERROR - Bash command failed Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1657, in _run_raw_task result = task_copy.execute(context=context) File "/usr/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 131, in execute raise AirflowException("Bash command failed") AirflowException: Bash command failed [2019-12-05 09:21:42,871] {models.py:1811} INFO - Marking task as UP_FOR_RETRY [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 Traceback (most recent call last): [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/bin/airflow", line 32, in <module> [2019-12-05 09:21:43,147] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 args.func(args) [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/utils/cli.py", line 74, in wrapper [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 return f(*args, **kwargs) [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 526, in run [2019-12-05 09:21:43,148] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 _run(args, dag, ti) [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 445, in _run [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 pool=args.pool, [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 73, in wrapper [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 return func(*args, **kwargs) [2019-12-05 09:21:43,149] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1657, in _run_raw_task [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 result = task_copy.execute(context=context) [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 File "/usr/lib/python2.7/site-packages/airflow/operators/bash_operator.py", line 131, in execute [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 raise AirflowException("Bash command failed") [2019-12-05 09:21:43,150] {base_task_runner.py:101} INFO - Job 53877: Subtask time_my_task_1 airflow.exceptions.AirflowException: Bash command failed [2019-12-05 09:21:46,561] {logging_mixin.py:95} INFO - [2019-12-05 09:21:46,561] {jobs.py:2527} INFO - Task exited with return code 1
其实问题就出在这
用定时任务执行docker命令的脚本的时候报错如上标题,tty(终端设备的统称): tty一词源于Teletypes,或teletypewriters。
这个的意思是说后台linux执行的时候没有终端设备。我们一般执行docker里的命令时候都喜欢加上-it 这个参数,这里的-it 就是表示终端设备。
所以,如果我们docker执行后台运行的任务或者程序直接去除 -it 这个参数就不会出现这个报错了!
修改后的DAG模板
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators import ExternalTaskSensor from airflow.operators import EmailOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2019, 11, 20,12,0,0), 'retries': 3, 'retryDelay': timedelta(seconds=5), 'end_date': datetime(9999, 12, 31) } dag = DAG('time_my', default_args=default_args, schedule_interval='0 0 * * *') time_my_task_1 = BashOperator( task_id='time_my_task_1', dag=dag, bash_command='set -e;docker exec -i testsuan /bin/bash -c "cd /algorithm-platform/algorithm_model_code/ && python time_my_task_1.py " ' ) time_my_task_1
再次调度就不会出现错误了。