zoukankan      html  css  js  c++  java
  • EPH接收Event Hub Message

    简介:

    使用Python SDK,基于EPH方式接收Azure Event Hub中存储的message,EventProcessorHost()中使用Azure Storage存储offerset等信息。目前版本的SDK对中国区的支持还不是太好,使用起来需要结合中国区的特点指定具体的endpoint。


    示例程序:

    import asyncio
    from azure.eventprocessorhost import AbstractEventProcessor, EPHOptions
    from azure.eventprocessorhost import AzureStorageCheckpointLeaseManager
    from azure.eventprocessorhost import EventHubConfig
    from azure.eventprocessorhost import EventProcessorHost
    from azure.storage.blob import BlockBlobService
    
    
    class EventProcessor(AbstractEventProcessor):
    
    
        """
        Example Implmentation of AbstractEventProcessor
        """
        def __init__(self, params=None):
            """
            Init Event processor
            """
            self._msg_counter = 0
        async def open_async(self, context):
            """
            Called by processor host to initialize the event processor.
            """
            print(context.partition_id)
    
        async def close_async(self, context, reason):
            """
            Called by processor host to indicate that the event processor is being stopped.
            (Params) Context:Information about the partition
            """
    
            print("Reason: " + reason)
            print("context.partition_id: " + context.partition_id)
            print("context.offset: " + context.offset)
            print("context.sequence_number: {}".format(context.sequence_number))
    
        async def process_events_async(self, context, messages):
            """
            Called by the processor host when a batch of events has arrived.
            This is where the real work of the event processor is done.
            (Params) Context: Information about the partition, Messages: The events to be processed.
            """
            print(context.partition_id)
            print(messages)
            print("-----")
            await context.checkpoint_async()
    
        async def process_error_async(self, context, error):
            """
            Called when the underlying client experiences an error while receiving.
            EventProcessorHost will take care of recovering from the error and
            continuing to pump messages,so no action is required from
            (Params) Context: Information about the partition, Error: The error that occured.
            """
            print(repr(error))
    
    # 注意如果消息较多,请增加sleep的时间,防止程序提前终止
    async def wait_and_close(host):
            """
            Run EventProcessorHost for 6 minutes then shutdown.
            """
            await asyncio.sleep(360)
            await host.close_async()
    
    try:
    
        loop = asyncio.get_event_loop()
    
        # Storage Account Credentials
        STORAGE_ACCOUNT_NAME = "yuvmtestdiag489"
        STORAGE_KEY = "OgNFVB93P2oWCKcJaNOt4kuW0lf911D11v6NiHcDC6R0SdiqUJYlP8UdW7U4p0UzSnreII+7BnS9Cy1VsMSqGg=="
        LEASE_CONTAINER_NAME = "leases5"
    
        # Eventhub config and storage manager
        EH_CONFIG = EventHubConfig('cznamespace', 'yutaoeventhubtest', 'RootManageSharedAccessKey',
                                   'wnwcU+VjKLGgNUwiecwAyHb9gAVPDIC+RvXu2JcDINc=', consumer_group="$default")
    
    
        # 注意务必对key进行urlencode编码
        ADDRESS = ("amqps://"
                   "RootManageSharedAccessKey"
                   ":"
                   "wnwcU%2bVjKLGgNUwiecwAyHb9gAVPDIC%211vXu2JcDINc%3d"
                   "@"
                   "cznamespace.servicebus.chinacloudapi.cn"
                   "/"
                   "yutaoeventhubtest")
    
        EH_CONFIG.client_address = ADDRESS
        STORAGE_MANAGER = AzureStorageCheckpointLeaseManager(STORAGE_ACCOUNT_NAME,
                                                             STORAGE_KEY,
                                                             LEASE_CONTAINER_NAME,
                                                             storage_blob_prefix="blob.core.chinacloudapi.cn")
    
        eph_options = EPHOptions()
    
        #Event loop and host
        LOOP = asyncio.get_event_loop()
        HOST = EventProcessorHost(EventProcessor, EH_CONFIG, STORAGE_MANAGER, eph_options=eph_options,
                                  ep_params=["param1", "param2"], loop=LOOP)
    
        # 重新设置一遍防止SDK擦除了之前的配置
        STORAGE_MANAGER.storage_client = BlockBlobService(account_name=STORAGE_ACCOUNT_NAME,
                                                          account_key=STORAGE_KEY,
                                                          endpoint_suffix="core.chinacloudapi.cn")
    
        tasks = asyncio.gather(
            HOST.open_async(),
            wait_and_close(HOST))
    
        loop.run_until_complete(tasks)
    
    except KeyboardInterrupt:
        # Canceling pending tasks and stopping the loop
        for task in asyncio.Task.all_tasks():
            task.cancel()
        loop.run_forever()
        tasks.exception()
    
    finally:
        loop.stop()
    
    

    注意: 连接信息已经失效,仅供格式参考。

    测试结果:

    1
    [<azure.eventhub.EventData object at 0x00000223AE556EF0>, <azure.eventhub.EventData object at 0x00000223AE556F98>, <azure.eventhub.EventData object at 0x00000223AE4DBA20>, <azure.eventhub.EventData object at 0x00000223AE4DB400>, <azure.eventhub.EventData object at 0x00000223AE4DBC88>, <azure.eventhub.EventData object at 0x00000223AE4DBAC8>, <azure.eventhub.EventData object at 0x00000223AE4FD7F0>, <azure.eventhub.EventData object at 0x00000223AE4FD6D8>, <azure.eventhub.EventData object at 0x00000223AE4FDBE0>, <azure.eventhub.EventData object at 0x00000223AE4FD048>]
    -----
    1
    [<azure.eventhub.EventData object at 0x00000223AE4FD278>, <azure.eventhub.EventData object at 0x00000223AE4FDB70>, <azure.eventhub.EventData object at 0x00000223AE4FD668>, <azure.eventhub.EventData object at 0x00000223AE4FDAC8>, <azure.eventhub.EventData object at 0x00000223AE4FDB38>, <azure.eventhub.EventData object at 0x00000223AE4FDF60>, <azure.eventhub.EventData object at 0x00000223AE57AB38>, <azure.eventhub.EventData object at 0x00000223AE57A470>, <azure.eventhub.EventData object at 0x00000223AE57A748>]
    -----
    Reason: Shutdown
    context.partition_id: 0
    context.offset: 978600
    context.sequence_number: 17559
    Reason: Shutdown
    context.partition_id: 1
    context.offset: 978544
    context.sequence_number: 17558
    
    Process finished with exit code 0
    
    

    参考示例:

    azure-event-hubs-python/examples/eph.py

    eph.py stop when the message not all received

  • 相关阅读:
    .net制作安装程序总结 dodo
    部署ASP.NET(包含.net framework 和MDAC) dodo
    (转).Net应用程序发布的解决方案[最新整理](可加桌面快捷、在程序中加自己的ICO及自动卸载等) dodo
    Eric的日期选择下拉列表小控件 dodo
    如何通过需要验证的邮件服务器发送邮件? dodo
    异常详细信息: System.Security.SecurityException: 不允许所请求的注册表访问权 dodo
    登陆模块防止恶意用户SQL注入攻击 dodo
    C#插入记录时单引号的处理 dodo
    System.Configuration命名空间下找不到ConfigurationManager类 dodo
    常用的.net日期控件 dodo
  • 原文地址:https://www.cnblogs.com/taro/p/9066937.html
Copyright © 2011-2022 走看看