zoukankan      html  css  js  c++  java
  • Supercharging your ETL with Airflow and Singer

    转自:https://www.stitchdata.com/blog/supercharging-etl-with-airflow-and-singer/ singer 团队关于singer 与airflow 集成的文章

    Earlier this year we introduced Singer, an open source project that helps data teams build simple, composable ETL. Singer provides a standard way for anyone to pull data from and send data to any source.

    For many companies, however, being able to push and pull data or move things from A to B is the only part of the problem. Data extraction is often part of a more complex workflow that involves scheduled tasks, complex dependencies, and the need for scalable, distributed architecture.

    Enter Apache Airflow. Originally developed at Airbnb and now a part of the Apache Incubator, Airflow takes the simplicity of a cron scheduler and adds all the facets of a modern workflow tool: dependency graphs, detailed logging, automated notifications, scalable infrastructure, and a graphical user interface.

    A dependency tree and history of task runs from Airflow’s UIA dependency tree and history of task runs from Airflow’s UI

    Imagine a company that relies on data from multiple data sources, including SaaS tools, databases, and flat files. Several times a day this company might want to ingest new data from these sources in parallel. The company might manipulate it in some way, then dump the output into a data warehouse.

    Airflow and Singer can make all of that happen. With a few lines of code, you can use Airflow to easily schedule and run Singer tasks, which can then trigger the remainder of your workflow.

    A real-world example

    Let’s look at a real-world example developed by a member of the Singer community. In this scenario we’re going to be pulling in CSV files, but Singer can work with any data source.

    Our user has a specific sequence of tasks they need to complete each morning:

    • Download new compressed CSV files from an AWS S3 bucket
    • Decompress those files
    • Use a Singer CSV tap to push the data to a Singer Stitch target. In this example we’re dumping data into Amazon Redshift, but you could target Google BigQuery or Postgres, too.
    • Delete the compressed and decompressed files

    This entire workflow, including all scripts, logging, and the Airflow implementation itself, is accomplished in fewer than 160 lines of Python code in this repo. Let’s see how it’s done.

    Firing up Airflow

    First we get Airflow running as described on the project’s Quick Start page with four commands:

    # airflow needs a home, ~/airflow is the default,
    # but you can lay foundation somewhere else if you prefer
    # (optional)
    export AIRFLOW_HOME=~/airflow
    
    # install from pypi using pip
    pip install airflow
    
    # initialize the database
    airflow initdb
    
    # start the web server, default port is 8080
    airflow webserver -p 8080
    

    Upon running that last command, you should see some ASCII art, letting you know that the web server is online:

    ASCII art

    Now, point your browser to http://localhost:8080/ to see a screen that looks like this:

    Airflow console

    At this point, you’re ready to create your own Airflow DAG (Directed Acyclic Graph) to perform data workflow tasks. For our purposes, we can get a ready-made DAG by cloning the airflow-singer repo:

    git clone git@github.com:robertjmoore/airflow-singer.git
    

    Customizing the repo

    For Airflow to find the DAG in this repo, you’ll need to tweak the dags_folder variable the ~/airflow/airflow.cfg file to point to the dags directory inside the repo:

    Edit the dags_folder variable

    You’ll also want to make a few tweaks to the singer.py file in the repo’s dags folder to reflect your contact info and the location of the repo on your local file system:

    More edits

    Restart the web server with the command airflow webserver -p 8080, then refresh the Airflow UI in your browser. You should now see the DAG from our repo:

    Updated DAG

    Clicking on it will show us the Graph View, which lays out the steps taken each morning when the DAG is run:

    Graph View

    This dependency map is governed by a few lines of code inside the dags/singer.py file. Let’s unpack a little of what’s going on.

    Exploring the DAG

    singer.py

    This tiny file defines the whole graph. Each of these tasks is a step in the DAG, and the final four lines draw out the dependencies that exist between them.

    You’ll notice that, in this file, each step is a BashOperator that calls a specific command-line task and waits for its successful completion. Airflow supports a number of other operators and allows you to build your own. This makes it easy for a DAG to include interactions with databases, email services, and chat tools like Slack.

    Interacting with Singer

    To get a better idea of how Singer is integrated, check out the individual files in the scripts/ directory. You'll find Python scripts that download data from an Amazon S3 bucket, extract that data, and delete the files on completion.

    The most interesting step is the process of using Singer to extract the data from the CSV files and push it to a target – namely Stitch.

    We should also note that the CSV tap requires a config file that tells it where to find the CSV files to push to Singer, so one step in our DAG is to generate that JSON config file and then point it to the files we just extracted. We do this by generating a few lines of JSON code. Note that we use the global Airflow variable execution_date across our various scripts to be sure we deposit and retrieve the files from the same path.

    generatejsonconfig.py

    Once that config file has been generated, we call Singer to do all the work in a single command line:

    tap-csv -c ~/config/csv-config.json | target-stitch -c ~/config/stitch_config.json
    

    This doesn’t even require a special Python script — the entire instruction is laid out in a single line of the singer.py DAG file.

    singer.py

    Conclusion

    As you can see, incorporating Singer into your Airflow DAGs gives you a powerful way to move data automatically. Anyone can extract and load data with a one-line instruction, using a growing ecosystem of taps and targets.

  • 相关阅读:
    如果你正在找工作,也许这七个方法会帮到你
    WebSocket 浅析
    关系数据库涉及中的范式与反范式
    MySQL字段类型与合理的选择字段类型
    ER图,数据建模与数据字典
    详解慢查询
    MySQL的最佳索引攻略
    后端技术演进
    MySQL主从复制(BinaryLog)
    MySQL读写分离
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/11532121.html
Copyright © 2011-2022 走看看