一:环境搭建(windows)
1.1 安装zookeeper
下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
修改系统环境变量,在Path后添加 ;D:Program_Filesapache-zookeeper-3.7.0in
复制confzoo_sample.cfg,重命名zoo.cfg,修改zoo.cfg中的dataDir,并添加一行dataLogDir
dataDir=D:/Program_Files/apache-zookeeper-3.7.0/data
dataLogDir=D:/Program_Files/apache-zookeeper-3.7.0/log
启动zkServer,cmd -> zkServer

2.2 安装kafka
下载最新版kafka,http://kafka.apache.org/downloads
添加系统环境变量,在Path后添加 ;D:Program_Fileskafka_2.13-2.8.0inwindows
解压到指定路径,如:E:kafka_2.12-0.10.2.0
修改kafka_2.13-2.8.0config目录下的server.properties中 log.dirs的值
log.dirs=D:/Program_Files/kafka_2.13-2.8.0/kafka-logs
启动kafka,cmd
d:
cd D:Program_Fileskafka_2.13-2.8.0
.inwindowskafka-server-start.bat .configserver.properties

二:测试
运行cmd命令行,创建一个topic,命令如下:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再打开一个cmd,创建一个Producer,命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
再打开一个cmd,创建一个Customer,命令如下:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
程序添加Nuget

using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Threading;
namespace KafKaNet
{
class Program
{
// localhost
static void Main(string[] args)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(Consumer));
Produce();
}
/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
for (var i = 1; i <= 10; i++)
{
p.Produce("test", new Message<Null, string> { Value = $"my111 message: {i}" }, handler);
}
p.Flush(TimeSpan.FromSeconds(3)); // 超时时间
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
Console.WriteLine("Done!");
Console.ReadKey();
}
/// <summary>
/// 消费者
/// </summary>
public static void Consumer(object obj)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("test");
try
{
while (true)
{
try
{
var consume = c.Consume();
string receiveMsg = consume.Message.Value;
Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
if (true)
{
c.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}
}
