zoukankan      html  css  js  c++  java
  • Azure Synapse Analysis 开箱 Blog

            继续上一篇的内容,本文将为大家介绍将 ChangeFeed 抽取逻辑通过 Azure Function 服务实现,架构图如下,通过 Function 服务来执行 ChangeFeed 的读取,并将其转存至 DataLake 存储中。

            在前文最后我们介绍了通过 Function 服务可以简化整个逻辑的代码开发,Azure Function 服务中原生已经内置了很多与 Azure 其他服务原生集成的连接器,可以帮助客户实现与上下游服务的对接,用户无需关注连接器的实现,通过框架的调用,用户可以直接可以通过对象访问到上下游服务中的数据,用户只需要关注业务逻辑即可。Cosmos Database 也在 Azure Function 的支持之中,并且内置的连接器采用的也是 ChangeFeed 来实现的,用户可以直接开箱即用实现 ChangeFeed 数据的读取,无需自己维护抽取逻辑代码。Azure Function 原生支持的连接器如下:

    Type1.x2.x and higher1TriggerInputOutput
    Blob storage
    Cosmos DB
    Event Grid  
    Event Hubs  
    HTTP & webhooks  
    IoT Hub  
    Microsoft Graph
    Excel tables
       
    Microsoft Graph
    OneDrive files
       
    Microsoft Graph
    Outlook email
         
    Microsoft Graph
    events
     
    Microsoft Graph
    Auth tokens
         
    Mobile Apps    
    Notification Hubs      
    Queue storage  
    SendGrid    
    Service Bus  
    SignalR    
    Table storage  
    Timer    
    Twilio    

             下面我们来开始操作:

    1. 准备开发环境,建议使用 Visual Studio Code,其内置的 Azure Function 的开发扩展,可以方便开发

    参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-01

    2. 创建 Azure Function 项目,注意在选择 trigger 部分,选择 cosmos db

    参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-02

    3. 准备 function.json 配置文件,function.json 主要描述 function 与上下游数据连接的参数,其中下述配置中 type:cosmosDBTrigger 部分定义了 cosmos 的连接信息, databaseName,collectionName 替换为前面所创建的 cosmos db的名称,leaseCollectionName 是 function 用来维护租约和 CheckPoint。connectionStringSetting 参见后续 local.settings.json。type:blob 部分定义了 function 下游存储 Data Lake 的连接信息,其中 path 参数定义了 function 抽取增量变化数据在 Data Lake 中的存储路径,connection 参数参见后续 local.settings.json。另外 feedPollDelay 参数表示 function 服务轮询 ChangeFeed 数据的间隔,其单位为毫秒,在演示中建议大家可以设置为 60000,实际根据数据水线处理时间和数据更新需求来决定。

    {
      "scriptFile": "__init__.py",
      "bindings": [
        {
          "type": "cosmosDBTrigger",
          "name": "documents",
          "direction": "in",
          "leaseCollectionName": "<leasecollectionname>",
          "connectionStringSetting": "Cosmos_DOCUMENTDB",
          "databaseName": "<databasename>",
          "collectionName": "<collectionname>",
          "createLeaseCollectionIfNotExists": "true",
          "feedPollDelay": <pollinterval>
        },
        {
          "name": "outputblob",
          "type": "blob",
          "path": "<filesystemname>/{DateTime}.csv",
          "connection": "ChangeFeedResultStorage",
          "direction": "out"
        }
      ]
    }

    4. 准备 local.settings.json,该配置文件中定义了在上述 function.json 中所引用的连接密钥参数, 其中 Cosmos_DOCUMENTDB 和 ChangeFeedResultStorage 内分别填入,Cosmos 和 Data Lake 的连接字符串,可以在 portal 中对应资源的 access 信息部分获取。

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "<connectionstring>",
        "FUNCTIONS_WORKER_RUNTIME": "python",
        "Cosmos_DOCUMENTDB": "<cosmosconnectionstring>",
        "ChangeFeedResultStorage": "<datalakeconnectionstring>"
      }
    }

    5. 准备业务逻辑代码 init.py,init.py 是 function 函数被拉起后的 entrypoint 函数,我们将前面介绍的抽取 changefeed 增量变化数据的代码逻辑定义其中,下述演示代码中通过调用 documents 和 outputblob 借助 function 内置的连接器实现对上下游数据访问,无需再自己开发集成代码。用户只需要开发自己的数据处理逻辑即可,演示中是将增量变化数据转存到 Data Lake 存储中。

    import logging
    import json 
    import csv
    import io
    
    import azure.functions as func
    
    # read changefeed from cosmos, store as CSV file
    def main(documents: func.DocumentList, outputblob: func.Out[str]) :    
        if documents:
            count = 0
            outputresult = io.StringIO()
            csv_writer = csv.writer(outputresult, quoting=csv.QUOTE_NONNUMERIC)
            for document in documents:
                if count == 0:
                   header = json.loads(document.to_json()).keys()
                   csv_writer.writerow(header)
                   count += 1
                csv_writer.writerow(json.loads(document.to_json()).values())    
            outputblob.set(outputresult.getvalue())

     6. 通过 VS Code Function 本地调试工具进行测试,可以仿真通过前面 blog 中的数据插入函数在 cosmos db 内插入一些新的数据,然后在 Data Lake 中是否转存成功。

    参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-04?tabs=powershell

    7. 将 Function 服务打包发布至 Azure Function 服务中,上述开发测试均在本地完成,测试无误后将代码正式发布至 Azure Function 服务

    参考:https://docs.microsoft.com/en-us/azure/developer/python/tutorial-vs-code-serverless-python-05

            至此通过 Function 服务完成 ChangeFeed 读取及转存至 DataLake 已经完成。整个 Function 代码中 function.json 中针对不同连接器的参数说明大家可以参阅:https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-cosmosdb?tabs=csharp

  • 相关阅读:
    存储引擎的优缺点及增删改查基本操作
    安装Mariadb
    Mysql 入门概念
    Nginx语法着色
    find用法,文件压缩和lsof和cpio
    软件包管理
    Django 生成六位随机图片验证码
    Django自定义过滤器和自定义标签
    Django零碎知识点
    jQuery实现淡入淡出样式轮播
  • 原文地址:https://www.cnblogs.com/wekang/p/13282407.html
Copyright © 2011-2022 走看看