zoukankan      html  css  js  c++  java
  • EPH接收Event Hub Message

    简介:

    使用Python SDK,基于EPH方式接收Azure Event Hub中存储的message,EventProcessorHost()中使用Azure Storage存储offerset等信息。目前版本的SDK对中国区的支持还不是太好,使用起来需要结合中国区的特点指定具体的endpoint。


    示例程序:

    import asyncio
    from azure.eventprocessorhost import AbstractEventProcessor, EPHOptions
    from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager
    from azure.eventprocessorhost import EventHubConfig
    from azure.eventprocessorhost import EventProcessorHost
    from azure.storage.blob import BlockBlobService
    
    
    class EventProcessor(AbstractEventProcessor):
    
    
        """
        Example Implmentation of AbstractEventProcessor
        """
        def __init__(self, params=None):
            """
            Init Event processor
            """
            self._msg_counter = 0
        async def open_async(self, context):
            """
            Called by processor host to initialize the event processor.
            """
            print(context.partition_id)
    
        async def close_async(self, context, reason):
            """
            Called by processor host to indicate that the event processor is being stopped.
            (Params) Context:Information about the partition
            """
    
            print("Reason: " + reason)
            print("context.partition_id: " + context.partition_id)
            print("context.offset: " + context.offset)
            print("context.sequence_number: {}".format(context.sequence_number))
    
        async def process_events_async(self, context, messages):
            """
            Called by the processor host when a batch of events has arrived.
            This is where the real work of the event processor is done.
            (Params) Context: Information about the partition, Messages: The events to be processed.
            """
            print(context.partition_id)
            print(messages)
            print("-----")
            await context.checkpoint_async()
    
        async def process_error_async(self, context, error):
            """
            Called when the underlying client experiences an error while receiving.
            EventProcessorHost will take care of recovering from the error and
            continuing to pump messages,so no action is required from
            (Params) Context: Information about the partition, Error: The error that occured.
            """
            print(repr(error))
    
    # 注意如果消息较多,请增加sleep的时间,防止程序提前终止
    async def wait_and_close(host):
            """
            Run EventProcessorHost for 6 minutes then shutdown.
            """
            await asyncio.sleep(360)
            await host.close_async()
    
    try:
    
        loop = asyncio.get_event_loop()
    
        # Storage Account Credentials
        STORAGE_ACCOUNT_NAME = "yuvmtestdiag489"
        STORAGE_KEY = "OgNFVB93P2oWCKcJaNOt4kuW0lf911D11v6NiHcDC6R0SdiqUJYlP8UdW7U4p0UzSnreII+7BnS9Cy1VsMSqGg=="
        LEASE_CONTAINER_NAME = "leases5"
    
        # Eventhub config and storage manager
        EH_CONFIG = EventHubConfig('cznamespace', 'yutaoeventhubtest', 'RootManageSharedAccessKey',
                                   'wnwcU+VjKLGgNUwiecwAyHb9gAVPDIC+RvXu2JcDINc=', consumer_group="$default")
    
    
        # 注意务必对key进行urlencode编码
        ADDRESS = ("amqps://"
                   "RootManageSharedAccessKey"
                   ":"
                   "wnwcU%2bVjKLGgNUwiecwAyHb9gAVPDIC%211vXu2JcDINc%3d"
                   "@"
                   "cznamespace.servicebus.chinacloudapi.cn"
                   "/"
                   "yutaoeventhubtest")
    
        EH_CONFIG.client_address = ADDRESS
        STORAGE_MANAGER = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME,
                                                             STORAGE_KEY,
                                                             LEASE_CONTAINER_NAME,
                                                             storage_blob_prefix="blob.core.chinacloudapi.cn")
    
        eph_options = EPHOptions()
    
        #Event loop and host
        LOOP = asyncio.get_event_loop()
        HOST = EventProcessorHost(EventProcessor, EH_CONFIG, STORAGE_MANAGER, eph_options=eph_options,
                                  ep_params=["param1", "param2"], loop=LOOP)
    
        # 重新设置一遍防止SDK擦除了之前的配置
        STORAGE_MANAGER.storage_client = BlockBlobService(account_name=STORAGE_ACCOUNT_NAME,
                                                          account_key=STORAGE_KEY,
                                                          endpoint_suffix="core.chinacloudapi.cn")
    
        tasks = asyncio.gather(
            HOST.open_async(),
            wait_and_close(HOST))
    
        loop.run_until_complete(tasks)
    
    except KeyboardInterrupt:
        # Canceling pending tasks and stopping the loop
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        tasks.exception()
    
    finally:
        loop.stop()
    
    

    注意: 连接信息已经失效,仅供格式参考。

    测试结果:

    1
    [<azure.eventhub.EventData object at 0x00000223AE556EF0>, <azure.eventhub.EventData object at 0x00000223AE556F98>, <azure.eventhub.EventData object at 0x00000223AE4DBA20>, <azure.eventhub.EventData object at 0x00000223AE4DB400>, <azure.eventhub.EventData object at 0x00000223AE4DBC88>, <azure.eventhub.EventData object at 0x00000223AE4DBAC8>, <azure.eventhub.EventData object at 0x00000223AE4FD7F0>, <azure.eventhub.EventData object at 0x00000223AE4FD6D8>, <azure.eventhub.EventData object at 0x00000223AE4FDBE0>, <azure.eventhub.EventData object at 0x00000223AE4FD048>]
    -----
    1
    [<azure.eventhub.EventData object at 0x00000223AE4FD278>, <azure.eventhub.EventData object at 0x00000223AE4FDB70>, <azure.eventhub.EventData object at 0x00000223AE4FD668>, <azure.eventhub.EventData object at 0x00000223AE4FDAC8>, <azure.eventhub.EventData object at 0x00000223AE4FDB38>, <azure.eventhub.EventData object at 0x00000223AE4FDF60>, <azure.eventhub.EventData object at 0x00000223AE57AB38>, <azure.eventhub.EventData object at 0x00000223AE57A470>, <azure.eventhub.EventData object at 0x00000223AE57A748>]
    -----
    Reason: Shutdown
    context.partition_id: 0
    context.offset: 978600
    context.sequence_number: 17559
    Reason: Shutdown
    context.partition_id: 1
    context.offset: 978544
    context.sequence_number: 17558
    
    Process finished with exit code 0
    
    

    参考示例:

    azure-event-hubs-python/examples/eph.py

    eph.py stop when the message not all received

  • 相关阅读:
    MST(prim)+树形dp-hdu-4756-Install Air Conditioning
    Java常用排序算法+程序员必须掌握的8大排序算法
    高可用可伸缩架构实用经验谈
    MYSQL索引失效的各种情形总结
    MySQL使用索引的场景及真正利用索引的SQL类型
    MySQL数据库索引的4大类型以及相关的索引创建
    JVM调优浅谈
    dubbo作为消费者注册过程分析--????
    webservice 协议
    你应该知道的 RPC 原理
  • 原文地址:https://www.cnblogs.com/taro/p/9066937.html
Copyright © 2011-2022 走看看