zoukankan      html  css  js  c++  java
  • AirFlow 连接访问MYSQL

    from airflow import DAG
    from airflow.operators.python import PythonOperator
    # from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.operators.mysql_operator import MySqlOperator
    from airflow.operators.dummy import DummyOperator
    from liveramp.common.metadata import default_args
    from airflow.models import Variable
    
    from liveramp.common.notification import send_to_slack, generate_message_blocks_according_to_upstream
    import uuid
    
    
    def failure_callback(context):
        send_to_slack(generate_message_blocks_according_to_upstream(context))
    
    
    def success_callback(context):
        send_to_slack(generate_message_blocks_according_to_upstream(context))
    
    
    # uuid3 need namespace and each time input an different namespace
    # value = uuid.uuid3(uuid.NAMESPACE_DNS, "AirFlow")
    value = uuid.uuid4()
    suid = ''.join(str(value).split("-"))
    sql_ = 'insert job(job_id, tenant_id, tenant_env, tenant_display_name, tenant_name, tenant_settings) values( "' + str(
        suid) + '",111,"QA","TEST","Test_display_name","hdfghjdsagfdhsgf");'
    sql_ = sql_ + "insert job_status(job_id, job_reference_id)  values ('{}',1);".format(suid)
    sql_ = sql_ + "insert job_status_log(job_id, job_reference_id,memo)  values ('{}',1,'init insert');".format(suid)
    print(sql_)
    
    dag = DAG(
        'collect_requests_dag_backup',
        default_args=default_args,
        tags=['mysql', 'MySqlOperator'],
        # start_date=datetime(2021, 1, 1),
        schedule_interval=None,
        catchup=False,
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag)
    
    end = DummyOperator(
        task_id='end',
        dag=dag)
    
    sql_insert = MySqlOperator(
        task_id='sql_insert',
        mysql_conn_id='mysql_conn_id',
        sql=sql_,
        dag=dag,
    )
    
    
    start >> sql_insert >> end

    走的是一个已经弃用的方法,

    from airflow.operators.mysql_operator import MySqlOperator 实测可用


    需要在Connection这一端进行配置ConnectionID,写好后点击Test按钮

    import 包的时候,如果放弃弃用的方法

    from airflow.operators.mysql_operator import MySqlOperator

    可以用新的依赖:
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow import DAG
    from airflow.providers.mysql.operators.mysql import MySqlOperator
    from airflow.operators.dummy import DummyOperator
    from liveramp.common.metadata import default_args
    
    from liveramp.common.notification import send_to_slack, generate_message_blocks_according_to_upstream
    import uuid
    
    
    def failure_callback(context):
        send_to_slack(generate_message_blocks_according_to_upstream(context))
    
    
    def success_callback(context):
        send_to_slack(generate_message_blocks_according_to_upstream(context))
    
    
    value = uuid.uuid4()
    suid = ''.join(str(value).split("-"))
    sql_ = 'insert job(job_id, tenant_id, tenant_env, tenant_display_name, tenant_name, tenant_settings) values( "' + str(
        suid) + '",111,"QA","TEST","Test_display_name","hdfghjdsagfdhsgf");'
    sql_ = sql_ + "insert job_status(job_id, job_reference_id)  values ('{}',1);".format(suid)
    sql_ = sql_ + "insert job_status_log(job_id, job_reference_id,memo)  values ('{}',1,'init insert');".format(suid)
    print(sql_)
    
    dag = DAG(
        'collect_requests_dag',
        default_args=default_args,
        tags=['mysql', 'MySqlOperator'],
        # start_date=datetime(2021, 1, 1),
        schedule_interval=None,
        catchup=False,
    )
    
    start = DummyOperator(
        task_id='start',
        dag=dag)
    
    end = DummyOperator(
        task_id='end',
        dag=dag)
    
    sql_insert = MySqlOperator(
        task_id='sql_insert',
        mysql_conn_id='mysql_conn_id',
        sql=sql_,
        autocommit=True,
        dag=dag,
    )
    
    start >> sql_insert >> end
    ------------------------- A little Progress a day makes you a big success... ----------------------------
  • 相关阅读:
    baremetal node & openstack hypervisor &openstack flavor
    bridge fdb vxlan nolearning
    bridge fdb 与vxlan
    FRRouting Architecture
    bridge fdb Command Output
    while循环和until语句
    Python私有属性和私有方法
    python面向对象封装案例2
    Python面向对象封装案例
    Python类和对象
  • 原文地址:https://www.cnblogs.com/qianjinyan/p/15662900.html
Copyright © 2011-2022 走看看