zoukankan      html  css  js  c++  java
  • airflow使用SimpleHttpOperator实现http调用任务

    使用SimpleHttpOperator作为处理器的时候,会发现默认访问的地址www.google.com端口为443

    例如下面这样定义的任务

    task = SimpleHttpOperator(
        task_id='get_op',
        http_conn_id='http_test',
        method='GET',
        endpoint='test1',
        data={},
        headers={},
        dag=dag)
    

    在运行的时候会抛出如下异常:

    Subtask: During handling of the above exception, another exception occurred:
    ......
    File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/operators/http_operator.py", line 82, in execute
      self.extra_options)
    File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 86, in run
      return self.run_and_check(session, prepped_request, extra_options)
    File "/data1/bigdata/anaconda3.5/lib/python3.6/site-packages/airflow/hooks/http_hook.py", line 102, in run_and_check
      allow_redirects=extra_options.get("allow_redirects", True))
    ......
    Subtask: requests.exceptions.ConnectionError: HTTPSConnectionPool(host='www.google.com', port=443): Max retries exceeded with url: /test1 (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x2ac347314940>: Failed to establish a new connection: [Errno 101] Network is unreachable',))
    

    说明http请求的host需要进行配置,不然默认访问谷歌域名.

    查看源码:

    http_hook.py

        def get_conn(self, headers):
    		......
            conn = self.get_connection(self.http_conn_id)
            session = requests.Session()
            self.base_url = conn.host
            if not self.base_url.startswith('http'):
                self.base_url = 'http://' + self.base_url
    		......
    

    base_hook.py

        def get_connection(cls, conn_id):
            environment_uri = os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
            conn = None
            if environment_uri:
                conn = Connection(conn_id=conn_id, uri=environment_uri)
            else:
                conn = random.choice(cls.get_connections(conn_id))
            if conn.host:
                logging.info("Using connection to: " + conn.host)
            return conn
    

    通过源码得知,airflow会先读取环境变量看是否有自定义uri,如果有的话使用自定义的uri,如果没有的话则使用内置的默认值。

    而环境变量的定义规则是AIRFLOW_CONN_前缀加上http_conn_id的大写形式

    例如上述例子中的任务,可以通过设置环境变量export AIRFLOW_CONN_HTTP_TEST=http://localhost:8080来实现。

    同时也可以在python代码中动态设置:

    os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:8080'
    

    一般推荐在代码中动态设置.

    SimpleHttpOperator的几种常见用法如下(官方示例):

    t1 = SimpleHttpOperator(
        task_id='post_op',
        endpoint='api/v1.0/nodes',
        data=json.dumps({"priority": 5}),
        headers={"Content-Type": "application/json"},
        response_check=lambda response: True if len(response.json()) == 0 else False,
        dag=dag)
    
    t5 = SimpleHttpOperator(
        task_id='post_op_formenc',
        endpoint='nodes/url',
        data="name=Joe",
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        dag=dag)
    
    t2 = SimpleHttpOperator(
        task_id='get_op',
        method='GET',
        endpoint='api/v1.0/nodes',
        data={"param1": "value1", "param2": "value2"},
        headers={},
        dag=dag)
    
    t3 = SimpleHttpOperator(
        task_id='put_op',
        method='PUT',
        endpoint='api/v1.0/nodes',
        data=json.dumps({"priority": 5}),
        headers={"Content-Type": "application/json"},
        dag=dag)
    
    t4 = SimpleHttpOperator(
        task_id='del_op',
        method='DELETE',
        endpoint='api/v1.0/nodes',
        data="some=data",
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        dag=dag)
    

    完整示例如下:

    import os
    from datetime import timedelta, datetime
    import pytz
    from airflow.operators.http_operator import SimpleHttpOperator
    from airflow.models import DAG
    
    default_args = {
        'owner': 'cord',
        'depends_on_past': False,
        'wait_for_downstream': True,
        'execution_timeout': timedelta(minutes=3),
        'email': ['123456789@qq.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    tz = pytz.timezone('Asia/Shanghai')
    dt = datetime(2018, 7, 26, 12, 20, tzinfo=tz)
    utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)
    os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'
    
    dag = DAG(
        'bm01',
        default_args=default_args,
        description='my DAG',
        schedule_interval='*/2 * * * *',
        start_date=utc_dt
    )
    
    task1 = SimpleHttpOperator(
        task_id='get_op1',
        http_conn_id='http_test',
        method='GET',
        endpoint='test1',
        data={},
        headers={},
        dag=dag)
    
    task2 = SimpleHttpOperator(
        task_id='get_op2',
        http_conn_id='http_test',
        method='GET',
        endpoint='test2',
        data={},
        headers={},
        dag=dag)
    
    task1 >> task2
    

    ​ 另外,这里SimpleHttpOperator发出的HTTP请求是阻塞的,也就是说在依赖任务中,只有上游任务执行完成返回之后才会去执行下游任务。

  • 相关阅读:
    清除内联元素间默认的间隔
    Idea配置SpringBoot热部署
    导出下载模板
    Java 使用Graphics2D 进行画图
    【Modbus】Java使用 modbus-master-tcp 读取和写入Modbus服务器数据
    解决txt文件上传oss服务器乱码的问题
    docker java.lang.NoClassDefFoundError: org/bouncycastle/** 解决
    解决SpringBoot 报错 Error parsing HTTP request header
    React+AntDesign使用Tree树控件完整展现其中的层级关系,并具有展开收起选择等交互功能
    AntDesign使用Form表单出现You cannot set a form field before rendering a field associated with the value
  • 原文地址:https://www.cnblogs.com/cord/p/9377292.html
Copyright © 2011-2022 走看看