zoukankan      html  css  js  c++  java
  • Kafka.net使用编程入门

    最近研究分布式消息队列,分享下!

    首先zookeeper  和 kafka 压缩包 解压 并配置好!

    我本机zookeeper环境配置如下:

    D:WorksoftwareApacheZookeeper3confzoo.cfg

    以下是kafka的配置

    D:WorksoftwareApachekafka2.11configserver.properties

    我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

    然后执行cmd命令:

     

    结果:

     然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

     

     

    重头戏来了,开始kafka C#客户端处理:

    首先引用kafka-net.dll,可以用vs2013的nuget下载,

    以下是Prorame.cs

     1 class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             const string topicName = "test";
     6             var options = new KafkaOptions(new Uri("http://localhost:9092"))
     7             {
     8                 Log = new ConsoleLog()
     9             };
    10             
    11             Task.Run(() =>
    12             {
    13                 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });
    14                 foreach (var data in consumer.Consume())
    15                 {
    16                     Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
    17                 }
    18             });
    19 
    20             //创建一个生产者发消息
    21             var producer = new Producer(new BrokerRouter(options))
    22             {
    23                 BatchSize = 100,
    24                 BatchDelayTime = TimeSpan.FromMilliseconds(2000)
    25             };
    26 
    27             Console.WriteLine("打出一条消息按 enter...");
    28             while (true)
    29             {
    30                 var message = Console.ReadLine();
    31                 if (message == "quit") break;
    32 
    33                 if (string.IsNullOrEmpty(message))
    34                 {
    35                     //
    36                     SendRandomBatch(producer, topicName, 200);
    37                 }
    38                 else
    39                 {
    40                     producer.SendMessageAsync(topicName, new[] { new Message(message) });
    41                 }
    42             }
    43 
    44             //释放资源
    45             using (producer)
    46             {
    47 
    48             }
    49         }
    50         private static async void SendRandomBatch(Producer producer, string topicName, int count)
    51         {
    52             //发送多个消息
    53             var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));
    54 
    55             Console.WriteLine("传送了 #{0} messages.  Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
    56 
    57             var response = await sendTask;
    58 
    59             Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
    60             foreach (var result in response.OrderBy(x => x.PartitionId))
    61             {
    62                 Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);
    63             }
    64 
    65         }
    66     }

    结果:

    闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686

  • 相关阅读:
    494. Target Sum 添加标点符号求和
    636. Exclusive Time of Functions 进程的执行时间
    714. Best Time to Buy and Sell Stock with Transaction Fee有交易费的买卖股票
    377. Combination Sum IV 返回符合目标和的组数
    325. Maximum Size Subarray Sum Equals k 和等于k的最长子数组
    275. H-Index II 递增排序后的论文引用量
    274. H-Index论文引用量
    RabbitMQ学习之HelloWorld(1)
    java之struts2的数据处理
    java之struts2的action的创建方式
  • 原文地址:https://www.cnblogs.com/Wulex/p/5578339.html
Copyright © 2011-2022 走看看