zoukankan      html  css  js  c++  java
  • pipelinewise 学习二 创建一个简单的pipeline

    pipelinewise 提供了方便的创建简单pipeline的命令,可以简化pipeline 的创建,同时也可以帮我们学习

    生成demo pipeline

    pipelinewise init --name pipelinewise_samples

    效果

    一个简单的pg 2 pg 的demo

    • 生成tap 以及target yaml 配置
      直接使用的demo 配置文件
    cp tap_postgres.yml.sample tap_postgres.yml
    cp target_postgres.yml.sample target_postgres.yml
    • 修改配置
      tap
     
     ---
    # ------------------------------------------------------------------------------
    # General Properties
    # ------------------------------------------------------------------------------
    id: "postgres_sample" # Unique identifier of the tap
    name: "Sample Postgres Database" # Name of the tap
    type: "tap-postgres" # !! THIS SHOULD NOT CHANGE !!
    owner: "somebody@foo.com" # Data owner to contact
    # ------------------------------------------------------------------------------
    # Source (Tap) - PostgreSQL connection details
    # ------------------------------------------------------------------------------
    db_conn:
      host: "localhost" # PostgreSQL host
      port: 15432 # PostgreSQL port
      user: "pipelinewise" # PostfreSQL user
      password: "secret" # Plain string or vault encrypted
      dbname: "postgres_source_db" # PostgreSQL database name
      #filter_schemas: "schema1,schema2" # Optional: Scan only the required schemas
                                           # to improve the performance of
                                           # data extraction
    # ------------------------------------------------------------------------------
    # Destination (Target) - Target properties
    # Connection details should be in the relevant target YAML file
    # ------------------------------------------------------------------------------
    target: "postgres_dwh" # ID of the target connector where the data will be loaded
    batch_size_rows: 20000 # Batch size for the stream to optimise load performance
    # ------------------------------------------------------------------------------
    # Source to target Schema mapping
    # ------------------------------------------------------------------------------
    schemas:
      - source_schema: "public" # Source schema in postgres with tables
        target_schema: "repl_pg_public" # Target schema in the destination Data Warehouse
        target_schema_select_permissions: # Optional: Grant SELECT on schema and tables that created
          - grp_stats
        # List of tables to replicate from Postgres to destination Data Warehouse
        #
        # Please check the Replication Strategies section in the documentation to understand the differences.
        # For LOG_BASED replication method you might need to adjust the source mysql/ mariadb configuration.
        tables:
          - table_name: "city"
            replication_method: "FULL_TABLE" # One of INCREMENTAL, LOG_BASED and FULL_TABLE
            replication_key: "last_update" # Important: Incremental load always needs replication key
            # OPTIONAL: Load time transformations
            #transformations:                    
            # - column: "last_name" # Column to transform
            # type: "SET-NULL" # Transformation type
          # You can add as many tables as you need...
          - table_name: "country"
            replication_method: "FULL_TABLE" # Important! Log based must be enabled in PostgreSQL
      # You can add as many schemas as you need...
      # Uncommend this if you want replicate tables from multiple schemas
      #- source_schema: "another_schema_in_postgres" 
      # target_schema: "another
     
     

    target

    ---
    # ------------------------------------------------------------------------------
    # General Properties
    # ------------------------------------------------------------------------------
    id: "postgres_dwh" # Unique identifier of the target
    name: "Postgres Data Warehouse" # Name of the target
    type: "target-postgres" # !! THIS SHOULD NOT CHANGE !!
    # ------------------------------------------------------------------------------
    # Target - Data Warehouse connection details
    # ------------------------------------------------------------------------------
    db_conn:
      host: "localhost" # Postgres host
      port: 15433 # Postgres port
      user: "pipelinewise" # Postgres user
      password: "secret" # Plain string or vault encrypted
      dbname: "postgres_dwh" # Postgres database name
     
     

    激活pipeline

    • 激活部署的服务
    pipelinewise import --dir pipelinewise_samples

    效果

     2019-09-17 05:07:55 INFO: Searching YAML config files in /app/wrk
    2019-09-17 05:07:55 INFO: LOADING TARGET: target_postgres.yml
    2019-09-17 05:07:55 INFO: LOADING TAP: tap_postgres.yml
    2019-09-17 05:07:55 INFO: SAVING CONFIG
    2019-09-17 05:07:55 INFO: SAVING MAIN CONFIG JSON to /root/.pipelinewise/config.json
    2019-09-17 05:07:55 INFO: SAVING TARGET JSONS to /root/.pipelinewise/postgres_dwh/config.json
    2019-09-17 05:07:55 INFO: SAVING TAP JSONS to /root/.pipelinewise/postgres_dwh/postgres_sample
    2019-09-17 05:07:55 INFO: ACTIVATING TAP STREAM SELECTIONS...
    [Parallel(n_jobs=-1)]: Using backend ThreadingBackend with 4 concurrent workers.
    2019-09-17 05:07:55 INFO: Discovering postgres_sample (tap-postgres) tap in postgres_dwh (target-postgres) target...
    2019-09-17 05:07:56 INFO: Loading pre defined selection from /root/.pipelinewise/postgres_dwh/postgres_sample/selection.json
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-edgydata tap_stream_id as not selected
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-city tap_stream_id as selected with properties {'replication_method': 'FULL_TABLE', 'tap_stream_id': 'postgres_source_db-public-city'}
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-country tap_stream_id as selected with properties {'replication_method': 'FULL_TABLE', 'tap_stream_id': 'postgres_source_db-public-country'}
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-countrylanguage tap_stream_id as not selected
    2019-09-17 05:07:56 INFO: Loading pre defined selection from /root/.pipelinewise/postgres_dwh/postgres_sample/selection.json
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-edgydata tap_stream_id as not selected
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-city tap_stream_id as selected with properties {'replication_method': 'FULL_TABLE', 'tap_stream_id': 'postgres_source_db-public-city'}
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-country tap_stream_id as selected with properties {'replication_method': 'FULL_TABLE', 'tap_stream_id': 'postgres_source_db-public-country'}
    2019-09-17 05:07:56 INFO: Mark postgres_source_db-public-countrylanguage tap_stream_id as not selected
    2019-09-17 05:07:56 INFO: Writing new properties file with changes into /root/.pipelinewise/postgres_dwh/postgres_sample/properties.json
    [Parallel(n_jobs=-1)]: Done 1 tasks | elapsed: 0.3s
    [Parallel(n_jobs=-1)]: Done 1 out of 1 | elapsed: 0.3s finished
    2019-09-17 05:07:56 INFO: 
                -------------------------------------------------------
                IMPORTING YAML CONFIGS FINISHED
                -------------------------------------------------------
                    Total targets to import : 1
                    Total taps to import : 1
                    Taps imported successfully : 1
                    Taps failed to import : []
                    Runtime : 0:00:00.409421
                -------------------------------------------------------
     
     
    • 查看状态
    pipelinewise status

    效果

    Tap ID Tap Type Target ID Target Type Enabled Status Last Sync Last Sync Result
    --------------- ------------ ------------ --------------- --------- -------- ----------- ------------------
    postgres_sample tap-postgres postgres_dwh target-postgres True ready unknown

    运行pipeline

    • 执行命令
    pipelinewise run_tap --tap postgres_sample --target postgres_dwh

    效果:

    2019-09-17 05:08:36 INFO: Running postgres_sample tap in postgres_dwh target
    2019-09-17 05:08:36 INFO: No table available that needs to be sync by fastsync
    2019-09-17 05:08:36 INFO: Table(s) selected to sync by singer: ['postgres_source_db-public-city', 'postgres_source_db-public-country']
    2019-09-17 05:08:36 INFO: Writing output into /root/.pipelinewise/postgres_dwh/postgres_sample/log/postgres_dwh-postgres_sample-20190917_050836.singer.log

    数据库效果

    • 查看状态
     
    pipelinewise status

    参考资料

    https://transferwise.github.io/pipelinewise/installation_guide/creating_pipelines.html
    https://transferwise.github.io/pipelinewise/installation_guide/running_pipelines.html

  • 相关阅读:
    实用Javascript调试技巧
    fetch的常见问题及其解决办法
    为什么重复的GET请求变慢了?
    JavaScript深入浅出第4课:V8引擎是如何工作的?
    一步一步搭建前端监控系统:如何定位前端线上问题?
    如何使用 Set 来提高JS代码的性能
    详解Vue的slot新用法
    详解Vue响应式原理
    BeautyWe.js 一套专注于微信小程序的开发范式
    await Task.Yield()和await Task.CompletedTask有什么不同
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/11533100.html
Copyright © 2011-2022 走看看