zoukankan      html  css  js  c++  java
  • Kafka.net使用编程入门(一)

    最近研究分布式消息队列,分享下!

    首先zookeeper  和 kafka 压缩包 解压 并配置好!

    我本机zookeeper环境配置如下:

    D:WorksoftwareApacheZookeeper3confzoo.cfg

    以下是kafka的配置

    D:WorksoftwareApachekafka2.11configserver.properties

    我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer

    然后执行cmd命令:

     

    结果:

     然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:

     

     

    重头戏来了,开始kafka C#客户端处理:

    首先引用kafka-net.dll,可以用vs2013的nuget下载,

    以下是Prorame.cs:

    [csharp] view plain copy
     
    1. class Program  
    2.      {  
    3.          static void Main(string[] args)  
    4.          {  
    5.              const string topicName = "test";  
    6.              var options = new KafkaOptions(new Uri("http://localhost:9092"))  
    7.              {  
    8.                  Log = new ConsoleLog()  
    9.              };  
    10.                
    11.              Task.Run(() =>  
    12.              {  
    13.                  var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });  
    14.                  foreach (var data in consumer.Consume())  
    15.                  {  
    16.                      Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());  
    17.                  }  
    18.              });  
    19.    
    20.              //创建一个生产者发消息  
    21.              var producer = new Producer(new BrokerRouter(options))  
    22.              {  
    23.                  BatchSize = 100,  
    24.                  BatchDelayTime = TimeSpan.FromMilliseconds(2000)  
    25.              };  
    26.    
    27.              Console.WriteLine("打出一条消息按 enter...");  
    28.              while (true)  
    29.              {  
    30.                  var message = Console.ReadLine();  
    31.                  if (message == "quit") break;  
    32.    
    33.                  if (string.IsNullOrEmpty(message))  
    34.                  {  
    35.                      //  
    36.                      SendRandomBatch(producer, topicName, 200);  
    37.                  }  
    38.                  else  
    39.                  {  
    40.                      producer.SendMessageAsync(topicName, new[] { new Message(message) });  
    41.                  }  
    42.              }  
    43.    
    44.              //释放资源  
    45.              using (producer)  
    46.              {  
    47.    
    48.              }  
    49.          }  
    50.          private static async void SendRandomBatch(Producer producer, string topicName, int count)  
    51.          {  
    52.              //发送多个消息  
    53.              var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));  
    54.    
    55.              Console.WriteLine("传送了 #{0} messages.  Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);  
    56.    
    57.              var response = await sendTask;  
    58.    
    59.              Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);  
    60.              foreach (var result in response.OrderBy(x => x.PartitionId))  
    61.              {  
    62.                  Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);  
    63.              }  
    64.    
    65.          }  
    66.      }  
     结果:

    闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686

  • 相关阅读:
    如何创建并运行Java线程
    PHP捕获Fatal error错误与异常处理
    WEB系统启动时加载Log4j的配置文件
    Log4j日志配置
    CharacterEncodingFilter-Spring字符编码过滤器
    Struts2的属性驱动与模型驱动的区别
    Filter之——GZIP全站压缩
    乱码问题总结
    【总结】编写自己的JDBC框架
    四大域总结
  • 原文地址:https://www.cnblogs.com/zxtceq/p/9067840.html
Copyright © 2011-2022 走看看