zoukankan      html  css  js  c++  java
  • EPH接收Event Hub Message

    简介:

    使用Python SDK,基于EPH方式接收Azure Event Hub中存储的message,EventProcessorHost()中使用Azure Storage存储offerset等信息。目前版本的SDK对中国区的支持还不是太好,使用起来需要结合中国区的特点指定具体的endpoint。


    示例程序:

    import asyncio
    from azure.eventprocessorhost import AbstractEventProcessor, EPHOptions
    from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager
    from azure.eventprocessorhost import EventHubConfig
    from azure.eventprocessorhost import EventProcessorHost
    from azure.storage.blob import BlockBlobService
    
    
    class EventProcessor(AbstractEventProcessor):
    
    
        """
        Example Implmentation of AbstractEventProcessor
        """
        def __init__(self, params=None):
            """
            Init Event processor
            """
            self._msg_counter = 0
        async def open_async(self, context):
            """
            Called by processor host to initialize the event processor.
            """
            print(context.partition_id)
    
        async def close_async(self, context, reason):
            """
            Called by processor host to indicate that the event processor is being stopped.
            (Params) Context:Information about the partition
            """
    
            print("Reason: " + reason)
            print("context.partition_id: " + context.partition_id)
            print("context.offset: " + context.offset)
            print("context.sequence_number: {}".format(context.sequence_number))
    
        async def process_events_async(self, context, messages):
            """
            Called by the processor host when a batch of events has arrived.
            This is where the real work of the event processor is done.
            (Params) Context: Information about the partition, Messages: The events to be processed.
            """
            print(context.partition_id)
            print(messages)
            print("-----")
            await context.checkpoint_async()
    
        async def process_error_async(self, context, error):
            """
            Called when the underlying client experiences an error while receiving.
            EventProcessorHost will take care of recovering from the error and
            continuing to pump messages,so no action is required from
            (Params) Context: Information about the partition, Error: The error that occured.
            """
            print(repr(error))
    
    # 注意如果消息较多,请增加sleep的时间,防止程序提前终止
    async def wait_and_close(host):
            """
            Run EventProcessorHost for 6 minutes then shutdown.
            """
            await asyncio.sleep(360)
            await host.close_async()
    
    try:
    
        loop = asyncio.get_event_loop()
    
        # Storage Account Credentials
        STORAGE_ACCOUNT_NAME = "yuvmtestdiag489"
        STORAGE_KEY = "OgNFVB93P2oWCKcJaNOt4kuW0lf911D11v6NiHcDC6R0SdiqUJYlP8UdW7U4p0UzSnreII+7BnS9Cy1VsMSqGg=="
        LEASE_CONTAINER_NAME = "leases5"
    
        # Eventhub config and storage manager
        EH_CONFIG = EventHubConfig('cznamespace', 'yutaoeventhubtest', 'RootManageSharedAccessKey',
                                   'wnwcU+VjKLGgNUwiecwAyHb9gAVPDIC+RvXu2JcDINc=', consumer_group="$default")
    
    
        # 注意务必对key进行urlencode编码
        ADDRESS = ("amqps://"
                   "RootManageSharedAccessKey"
                   ":"
                   "wnwcU%2bVjKLGgNUwiecwAyHb9gAVPDIC%211vXu2JcDINc%3d"
                   "@"
                   "cznamespace.servicebus.chinacloudapi.cn"
                   "/"
                   "yutaoeventhubtest")
    
        EH_CONFIG.client_address = ADDRESS
        STORAGE_MANAGER = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME,
                                                             STORAGE_KEY,
                                                             LEASE_CONTAINER_NAME,
                                                             storage_blob_prefix="blob.core.chinacloudapi.cn")
    
        eph_options = EPHOptions()
    
        #Event loop and host
        LOOP = asyncio.get_event_loop()
        HOST = EventProcessorHost(EventProcessor, EH_CONFIG, STORAGE_MANAGER, eph_options=eph_options,
                                  ep_params=["param1", "param2"], loop=LOOP)
    
        # 重新设置一遍防止SDK擦除了之前的配置
        STORAGE_MANAGER.storage_client = BlockBlobService(account_name=STORAGE_ACCOUNT_NAME,
                                                          account_key=STORAGE_KEY,
                                                          endpoint_suffix="core.chinacloudapi.cn")
    
        tasks = asyncio.gather(
            HOST.open_async(),
            wait_and_close(HOST))
    
        loop.run_until_complete(tasks)
    
    except KeyboardInterrupt:
        # Canceling pending tasks and stopping the loop
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        tasks.exception()
    
    finally:
        loop.stop()
    
    

    注意: 连接信息已经失效,仅供格式参考。

    测试结果:

    1
    [<azure.eventhub.EventData object at 0x00000223AE556EF0>, <azure.eventhub.EventData object at 0x00000223AE556F98>, <azure.eventhub.EventData object at 0x00000223AE4DBA20>, <azure.eventhub.EventData object at 0x00000223AE4DB400>, <azure.eventhub.EventData object at 0x00000223AE4DBC88>, <azure.eventhub.EventData object at 0x00000223AE4DBAC8>, <azure.eventhub.EventData object at 0x00000223AE4FD7F0>, <azure.eventhub.EventData object at 0x00000223AE4FD6D8>, <azure.eventhub.EventData object at 0x00000223AE4FDBE0>, <azure.eventhub.EventData object at 0x00000223AE4FD048>]
    -----
    1
    [<azure.eventhub.EventData object at 0x00000223AE4FD278>, <azure.eventhub.EventData object at 0x00000223AE4FDB70>, <azure.eventhub.EventData object at 0x00000223AE4FD668>, <azure.eventhub.EventData object at 0x00000223AE4FDAC8>, <azure.eventhub.EventData object at 0x00000223AE4FDB38>, <azure.eventhub.EventData object at 0x00000223AE4FDF60>, <azure.eventhub.EventData object at 0x00000223AE57AB38>, <azure.eventhub.EventData object at 0x00000223AE57A470>, <azure.eventhub.EventData object at 0x00000223AE57A748>]
    -----
    Reason: Shutdown
    context.partition_id: 0
    context.offset: 978600
    context.sequence_number: 17559
    Reason: Shutdown
    context.partition_id: 1
    context.offset: 978544
    context.sequence_number: 17558
    
    Process finished with exit code 0
    
    

    参考示例:

    azure-event-hubs-python/examples/eph.py

    eph.py stop when the message not all received

  • 相关阅读:
    Adobe Acrobat XI pro v11.0.10中文版
    微软Office 2019 批量授权版21年04月更新版【福利】
    如何在PS里把证件照调正
    PS2020如何把图片中模糊不清的文字变清晰
    Office有效序列号大全
    计量经济学软件EViews11正式版【附激活文件】
    处理v-html的潜在XSS风险
    for 和 forEach 使用return,是跳出当前的循环,还是整个循环
    VUE防止多次点击,重复请求
    vue 分页插件使用
  • 原文地址:https://www.cnblogs.com/taro/p/9066937.html
Copyright © 2011-2022 走看看