zoukankan      html  css  js  c++  java
  • Azure Event hub usage

    1. create event hub on azure




    2. create a prj , event hub sender, install nuget pkg - azure service bus



    3. check connection string



    4. sender sample code


    static void Main(string[] args)
            {
                Console.WriteLine("Press Ctrl-C to stop the sender process");
                Console.WriteLine("Press Enter to start now");
                Console.ReadLine();
                SendingRandomMessages();
            }
    
    
            static string eventHubName = "get from event hub connection information";
            static string connectionString = "get from event hub connection information";
    
    
            static void SendingRandomMessages()
            {
                var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
                while (true)
                {
                    try
                    {
                        var message = Guid.NewGuid().ToString();
                        Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                        eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                        Console.ResetColor();
                    }
    
    
                    Thread.Sleep(200);
                }
            }
    



    5. create a storage account

    6. install nuget for receiver prj



    7. check keys



    8. sample code for receiver


    class SimpleEventProcessor : IEventProcessor
        {
            Stopwatch checkpointStopWatch;
    
    
            async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
            {
                Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
    
    
            Task IEventProcessor.OpenAsync(PartitionContext context)
            {
                Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
                this.checkpointStopWatch = new Stopwatch();
                this.checkpointStopWatch.Start();
                return Task.FromResult<object>(null);
            }
    
    
            async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (EventData eventData in messages)
                {
                    string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
    
                    Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                        context.Lease.PartitionId, data));
                }
    
    
                //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
                {
                    await context.CheckpointAsync();
                    this.checkpointStopWatch.Restart();
                }
            }
        }
    
    
    static void Main(string[] args)
            {
                string eventHubConnectionString = "get from azure event hub connection information";
                string eventHubName = "get from azure event hub connection information";
                string storageAccountName = "get from azure storage keys";
                string storageAccountKey = "get from azure storage keys";
                string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
    
    
                string eventProcessorHostName = Guid.NewGuid().ToString();
                EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
                Console.WriteLine("Registering EventProcessor...");
                var options = new EventProcessorOptions();
                options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
                eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
    
                Console.WriteLine("Receiving. Press enter key to stop worker.");
                Console.ReadLine();
                eventProcessorHost.UnregisterEventProcessorAsync().Wait();
            }
    



  • 相关阅读:
    LinuxCentOS6.5:六、克隆虚拟机
    Redis:一、简介
    LinuxCentOS6.5:五、软件安装
    shiro配置异常org.springframework.beans.factory.BeanInitializationException: The security manager does not implement the WebSecurityManager interface.
    Redis:二、安装
    LinuxCentOS6.5:二、基本命令
    HttpPostedFile hf = c.Request.Files[0]; 报错:索引超出 怎么办?
    sqlserver使用中遇到过的
    glassfish error : A full JDK (not just JRE) is required
    计划
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7269992.html
Copyright © 2011-2022 走看看