zoukankan      html  css  js  c++  java
  • Azure Event Hub 技术研究系列3-Event Hub接收事件

    上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:

    Azure Event Hub 技术研究系列2-发送事件到Event Hub

    本篇文章中,我们继续:从Event Hub中接收事件。

    1. 新建控制台工程 EventHubReceiver

    2. 添加Nuget引用

    Microsoft.Azure.EventHubs

    Microsoft.Azure.EventHubs.Processor

    3. 实现IEventProcessor接口

    MyEventProcessor

     1     using Microsoft.Azure.EventHubs;
     2     using Microsoft.Azure.EventHubs.Processor;
     3     using System.Threading.Tasks;
     4 
     5     public class MyEventProcessor : IEventProcessor
     6     {
     7         public Task CloseAsync(PartitionContext context, CloseReason reason)
     8         {
     9             Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
    10             return Task.CompletedTask;
    11         }
    12 
    13         public Task OpenAsync(PartitionContext context)
    14         {
    15             Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
    16             return Task.CompletedTask;
    17         }
    18 
    19         public Task ProcessErrorAsync(PartitionContext context, Exception error)
    20         {
    21             Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
    22             return Task.CompletedTask;
    23         }
    24 
    25         public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    26         {
    27             foreach (var eventData in messages)
    28             {
    29                 var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
    30                 Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
    31             }
    32 
    33             return context.CheckpointAsync();
    34         }
    35     }

    4. Program程序

    添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。

            private const string EhConnectionString = "{Event Hubs connection string}";
            private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
            private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
            private const string StorageAccountName = "{Storage account name}"; //linux1
            private const string StorageAccountKey = "{Storage account key}";
    private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);

    这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器

    增加MainAysnc方法:注册事件处理器,处理事件消息

     1         /// <summary>
     2         /// 注册事件处理器
     3         /// </summary>
     4         /// <param name="args"></param>
     5         /// <returns></returns>
     6         private static async Task MainAsync(string[] args)
     7         {
     8             Console.WriteLine("Registering EventProcessor...");
     9 
    10             var eventProcessorHost = new EventProcessorHost(
    11                 EhEntityPath,
    12                 PartitionReceiver.DefaultConsumerGroupName,
    13                 EhConnectionString,
    14                 StorageConnectionString,
    15                 StorageContainerName);
    16 
    17             // Registers the Event Processor Host and starts receiving messages
    18             await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
    19 
    20             Console.WriteLine("Receiving. Press ENTER to stop worker.");
    21             Console.ReadLine();
    22 
    23             // Disposes of the Event Processor Host
    24             await eventProcessorHost.UnregisterEventProcessorAsync();
    25         }

    Main函数

    1         static void Main(string[] args)
    2         {
    3             MainAsync(args).GetAwaiter().GetResult();
    4         }

    Run

    至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。

    周国庆

    2017/5/18

  • 相关阅读:
    [国家集训队] Crash 的文明世界
    [国家集训队] middle
    [正睿集训2021] 构造专练
    [正睿集训2021] LIS
    CF482E ELCA
    UVA
    UVA
    UVA
    UVA
    UVA
  • 原文地址:https://www.cnblogs.com/tianqing/p/6865149.html
Copyright © 2011-2022 走看看