zoukankan      html  css  js  c++  java
  • Azure Event hub usage

    1. create event hub on azure




    2. create a prj , event hub sender, install nuget pkg - azure service bus



    3. check connection string



    4. sender sample code


    static void Main(string[] args)
            {
                Console.WriteLine("Press Ctrl-C to stop the sender process");
                Console.WriteLine("Press Enter to start now");
                Console.ReadLine();
                SendingRandomMessages();
            }
    
    
            static string eventHubName = "get from event hub connection information";
            static string connectionString = "get from event hub connection information";
    
    
            static void SendingRandomMessages()
            {
                var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
                while (true)
                {
                    try
                    {
                        var message = Guid.NewGuid().ToString();
                        Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message);
                        eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
                    }
                    catch (Exception exception)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
                        Console.ResetColor();
                    }
    
    
                    Thread.Sleep(200);
                }
            }
    



    5. create a storage account

    6. install nuget for receiver prj



    7. check keys



    8. sample code for receiver


    class SimpleEventProcessor : IEventProcessor
        {
            Stopwatch checkpointStopWatch;
    
    
            async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
            {
                Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
    
    
            Task IEventProcessor.OpenAsync(PartitionContext context)
            {
                Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
                this.checkpointStopWatch = new Stopwatch();
                this.checkpointStopWatch.Start();
                return Task.FromResult<object>(null);
            }
    
    
            async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (EventData eventData in messages)
                {
                    string data = Encoding.UTF8.GetString(eventData.GetBytes());
    
    
                    Console.WriteLine(string.Format("Message received.  Partition: '{0}', Data: '{1}'",
                        context.Lease.PartitionId, data));
                }
    
    
                //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
                if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
                {
                    await context.CheckpointAsync();
                    this.checkpointStopWatch.Restart();
                }
            }
        }
    
    
    static void Main(string[] args)
            {
                string eventHubConnectionString = "get from azure event hub connection information";
                string eventHubName = "get from azure event hub connection information";
                string storageAccountName = "get from azure storage keys";
                string storageAccountKey = "get from azure storage keys";
                string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey);
    
    
                string eventProcessorHostName = Guid.NewGuid().ToString();
                EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString);
                Console.WriteLine("Registering EventProcessor...");
                var options = new EventProcessorOptions();
                options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); };
                eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait();
    
    
                Console.WriteLine("Receiving. Press enter key to stop worker.");
                Console.ReadLine();
                eventProcessorHost.UnregisterEventProcessorAsync().Wait();
            }
    



  • 相关阅读:
    Nginx配置文件nginx.conf中文详解
    PHP SOCKET编程 .
    当一个变量只能通过引用传递的时候。
    PHP-Socket-阻塞与非阻塞,同步与异步概念的理解
    PHP里的socket_recv方法解释
    jQuery获取Select选择的Text和 Value
    使用 cacti 批量监控服务器以及其 PHP 运作环境配置
    windows 和 linux 上 循环读取文件名称的区别和方法
    php 在linux 用fopen() 函数打开,file_get_contents(),fread()函数 读取 另外一台服务器映射过来的文件 总是返回false,null的情况。
    【问题解决】小数点前面不显示0的问题
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7269992.html
Copyright © 2011-2022 走看看