最近研究分布式消息队列,分享下!
首先zookeeper 和 kafka 压缩包 解压 并配置好!
我本机zookeeper环境配置如下:
D:WorksoftwareApacheZookeeper3confzoo.cfg
以下是kafka的配置
D:WorksoftwareApachekafka2.11configserver.properties
我已经加了path环境变量,没加的话需要到zookeeper对应bin目录下执行zkServer
然后执行cmd命令:
结果:
然后打开第二个dos窗口,我没加环境变量path,执行kafka命令如下:
重头戏来了,开始kafka C#客户端处理:
首先引用kafka-net.dll,可以用vs2013的nuget下载,
以下是Prorame.cs
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 const string topicName = "test";
6 var options = new KafkaOptions(new Uri("http://localhost:9092"))
7 {
8 Log = new ConsoleLog()
9 };
10
11 Task.Run(() =>
12 {
13 var consumer = new Consumer(new ConsumerOptions(topicName, new BrokerRouter(options)) { Log = new ConsoleLog() });
14 foreach (var data in consumer.Consume())
15 {
16 Console.WriteLine("Response: PartitionId={0},Offset={1} :Value={2}", data.Meta.PartitionId, data.Meta.Offset, data.Value.ToUtf8String());
17 }
18 });
19
20 //创建一个生产者发消息
21 var producer = new Producer(new BrokerRouter(options))
22 {
23 BatchSize = 100,
24 BatchDelayTime = TimeSpan.FromMilliseconds(2000)
25 };
26
27 Console.WriteLine("打出一条消息按 enter...");
28 while (true)
29 {
30 var message = Console.ReadLine();
31 if (message == "quit") break;
32
33 if (string.IsNullOrEmpty(message))
34 {
35 //
36 SendRandomBatch(producer, topicName, 200);
37 }
38 else
39 {
40 producer.SendMessageAsync(topicName, new[] { new Message(message) });
41 }
42 }
43
44 //释放资源
45 using (producer)
46 {
47
48 }
49 }
50 private static async void SendRandomBatch(Producer producer, string topicName, int count)
51 {
52 //发送多个消息
53 var sendTask = producer.SendMessageAsync(topicName, Enumerable.Range(0, count).Select(x => new Message(x.ToString())));
54
55 Console.WriteLine("传送了 #{0} messages. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
56
57 var response = await sendTask;
58
59 Console.WriteLine("已完成批量发送: {0}. Buffered:{1} AsyncCount:{2}", count, producer.BufferCount, producer.AsyncCount);
60 foreach (var result in response.OrderBy(x => x.PartitionId))
61 {
62 Console.WriteLine("主题:{0} PartitionId:{1} Offset:{2}", result.Topic, result.PartitionId, result.Offset);
63 }
64
65 }
66 }
结果:
闲的蛋疼,随便研究一些好东西,.net环境太封闭,每个.net程序员都要扩展视野,技术交流,本人QQ827937686