zoukankan      html  css  js  c++  java
  • Azure Event hub usage

    1. create event hub on azure




    2. create a prj , event hub sender, install nuget pkg - azure service bus



    3. check connection string



    4. sender sample code


    static void Main(string[] args)
            {
                Console.WriteLine("Press Ctrl-C to stop the sender process");
                Console.WriteLine("Press Enter to start now");
                Console.ReadLine();
                SendingRandomMessages();
            }
    
    
            static string eventHubName = "get from event hub connection information";
            static string connectionString = "get from event hub connection information";
    
    
            static void SendingRandomMessages()
            {
                var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
                while (true)
                {
                    try
                    {
                        var message = Guid.NewGuid().ToString();
                        Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                        eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                        Console.ResetColor();
                    }
    
    
                    Thread.Sleep(200);
                }
            }
    



    5. create a storage account

    6. install nuget for receiver prj



    7. check keys



    8. sample code for receiver


    class SimpleEventProcessor : IEventProcessor
        {
            Stopwatch checkpointStopWatch;
    
    
            async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
            {
                Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
    
    
            Task IEventProcessor.OpenAsync(PartitionContext context)
            {
                Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
                this.checkpointStopWatch = new Stopwatch();
                this.checkpointStopWatch.Start();
                return Task.FromResult<object>(null);
            }
    
    
            async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (EventData eventData in messages)
                {
                    string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
    
                    Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                        context.Lease.PartitionId, data));
                }
    
    
                //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
                {
                    await context.CheckpointAsync();
                    this.checkpointStopWatch.Restart();
                }
            }
        }
    
    
    static void Main(string[] args)
            {
                string eventHubConnectionString = "get from azure event hub connection information";
                string eventHubName = "get from azure event hub connection information";
                string storageAccountName = "get from azure storage keys";
                string storageAccountKey = "get from azure storage keys";
                string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
    
    
                string eventProcessorHostName = Guid.NewGuid().ToString();
                EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
                Console.WriteLine("Registering EventProcessor...");
                var options = new EventProcessorOptions();
                options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
                eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
    
                Console.WriteLine("Receiving. Press enter key to stop worker.");
                Console.ReadLine();
                eventProcessorHost.UnregisterEventProcessorAsync().Wait();
            }
    



  • 相关阅读:
    第11条:谨慎地覆盖clone
    第10条:始终要覆盖toString
    第9条:覆盖equals时总是覆盖hashCode
    第8条:覆盖equals时请遵守通用约定
    第7条:避免使用终结方法
    第6条:消除过期的对象引用
    第5条:避免创建不必要的对象
    第4条:通过私有构造器来强化不可实例化能力
    第3条:用私有构造器或者枚举类型强化Singleton属性
    第2条:遇到多个构造器参数时要考虑用构建器
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7269992.html
Copyright © 2011-2022 走看看