zoukankan      html  css  js  c++  java
  • Azure Event hub usage

    1. create event hub on azure




    2. create a prj , event hub sender, install nuget pkg - azure service bus



    3. check connection string



    4. sender sample code


    static void Main(string[] args)
            {
                Console.WriteLine("Press Ctrl-C to stop the sender process");
                Console.WriteLine("Press Enter to start now");
                Console.ReadLine();
                SendingRandomMessages();
            }
    
    
            static string eventHubName = "get from event hub connection information";
            static string connectionString = "get from event hub connection information";
    
    
            static void SendingRandomMessages()
            {
                var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
                while (true)
                {
                    try
                    {
                        var message = Guid.NewGuid().ToString();
                        Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                        eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                        Console.ResetColor();
                    }
    
    
                    Thread.Sleep(200);
                }
            }
    



    5. create a storage account

    6. install nuget for receiver prj



    7. check keys



    8. sample code for receiver


    class SimpleEventProcessor : IEventProcessor
        {
            Stopwatch checkpointStopWatch;
    
    
            async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
            {
                Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
    
    
            Task IEventProcessor.OpenAsync(PartitionContext context)
            {
                Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
                this.checkpointStopWatch = new Stopwatch();
                this.checkpointStopWatch.Start();
                return Task.FromResult<object>(null);
            }
    
    
            async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (EventData eventData in messages)
                {
                    string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
    
                    Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                        context.Lease.PartitionId, data));
                }
    
    
                //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
                {
                    await context.CheckpointAsync();
                    this.checkpointStopWatch.Restart();
                }
            }
        }
    
    
    static void Main(string[] args)
            {
                string eventHubConnectionString = "get from azure event hub connection information";
                string eventHubName = "get from azure event hub connection information";
                string storageAccountName = "get from azure storage keys";
                string storageAccountKey = "get from azure storage keys";
                string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
    
    
                string eventProcessorHostName = Guid.NewGuid().ToString();
                EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
                Console.WriteLine("Registering EventProcessor...");
                var options = new EventProcessorOptions();
                options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
                eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
    
                Console.WriteLine("Receiving. Press enter key to stop worker.");
                Console.ReadLine();
                eventProcessorHost.UnregisterEventProcessorAsync().Wait();
            }
    



  • 相关阅读:
    五、页脚footer
    一、页眉header
    四、(2)列布局+媒体查询
    二、导航栏nav
    coredns介绍
    pandas指定列索引和行索引
    学习笔记246—国家自然科学基金申请书写作攻略【收藏】
    Axios请求传参的格式
    NodeJspm2常用命令
    FastAPI实现谷歌DialogFlow 接口问答批量导入导出和批量删除 DialogFlow batch import and export Q&A interface
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7269992.html
Copyright © 2011-2022 走看看