一、Kafka配置:
参考网址:
http://m.blog.csdn.net/ydc321/article/details/70154278
http://www.2cto.com/net/201701/588235.html
http://www.jianshu.com/p/f7037105db46
http://www.jianshu.com/p/64d25dcf8300
https://my.oschina.net/phoebus789/blog/733670
http://orchome.com/kafka/index
准备工作:
1.安装jdk环境
http://www.oracle.com/technetwork/java/javase/downloads/index.html
2.下载kafka的程序安装包,并解压
http://kafka.apache.org/downloads
3.用命令行测试kafka生产和消费
在 kafka 目录,按住shift+鼠标右键->在此处打开命令窗口(W)
第一个命令窗口->启动zookeeper服务:
binwindowszookeeper-server-start.bat configzookeeper.properties
第二个命令窗口->启动kfaka服务:
binwindowskafka-server-start.bat configserver.properties
第三个命令窗口->启动启produce:
- 创建一个主题:
binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtopic
- 使用如下命令查看创建的主题列表:
binwindowskafka-topics.bat --list --zookeeper localhost:2181
- 启动生产者:
binwindowskafka-console-producer.bat --broker-list {本机ip}:9092 --topic testtopic
第四个命令窗口->启动consumer:
binwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic testtopic --from-beginning
二、测试kafka生产和消费
- 创建两个控制台项目,一个是生产者,一个是消费者
//生产者代码
1 using Confluent.Kafka; 2 using Confluent.Kafka.Serialization; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 7 namespace KafkaProducter 8 { 9 public class Program 10 { 11 public static void Main(string[] args) 12 { 13 string brokerList = "127.0.0.1:9092"; 14 string topicName = "testtopic"; 15 16 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 17 18 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 19 { 20 Console.WriteLine($"{producer.Name} producing on {topicName}. q to exit."); 21 22 string text; 23 while ((text = Console.ReadLine()) != "q") 24 { 25 var deliveryReport = producer.ProduceAsync(topicName, null, text); 26 deliveryReport.ContinueWith(task => 27 { 28 Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}"); 29 }); 30 } 31 32 // Tasks are not waited on synchronously (ContinueWith is not synchronous), 33 // so it's possible they may still in progress here. 34 producer.Flush(TimeSpan.FromSeconds(10)); 35 } 36 } 37 } 38 }
//消费者代码
1 using Confluent.Kafka; 2 using Confluent.Kafka.Serialization; 3 using System; 4 using System.Collections.Generic; 5 using System.Text; 6 7 namespace KafkaConsumer 8 { 9 public class Program 10 { 11 public static void Main(string[] args) 12 { 13 var config = new Dictionary<string, object> 14 { 15 { "group.id", "simple-csharp-consumer" }, 16 { "bootstrap.servers", "127.0.0.1:9092" } 17 }; 18 19 using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8))) 20 { 21 consumer.Assign(new List<TopicPartitionOffset> { new TopicPartitionOffset("testtopic", 0, 0) }); 22 23 while (true) 24 { 25 Message<Null, string> msg; 26 if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 27 { 28 Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}"); 29 } 30 } 31 } 32 } 33 } 34 }