一.安装运行环境
需要安装:jdk,zookeeper,kafka
https://www.cnblogs.com/bibi-feiniaoyuan/p/9539343.html
启动zookeeper:
bin目录下cmd执行: zkServer
启动kafka:
安装目录下cmd执行: .inwindowskafka-server-start.bat .configserver.properties
查看topic: .inwindowskafka-topics.bat --list --zookeeper localhost:2181
常见错误:
1.
修改 binwindows目录中的kafka-run-class.bat:找到set COMMAND,在 %CLASSPATH% 前后加上双引号 “%CLASSPATH%“
https://blog.csdn.net/u010513487/article/details/79483860
2.
重启kafka时报错:
1.删除kafka kafka-logs 文件夹下的日志
2.删除zookeeper tmp文件夹下的日志
重启 zookeeper,kafka
二.vs中使用
nuget中安装:
生产者:
// Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License. // // Refer to LICENSE for more information. using System; using System.IO; using System.Text; using System.Threading.Tasks; using System.Collections.Generic; using Confluent.Kafka; namespace Confluent.Kafka.Examples.Producer { public class Program { public static async Task Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Usage: .. brokerList topicName"); return; } string brokerList = args[0]; string topicName = args[1]; var config = new ProducerConfig { BootstrapServers = brokerList }; using (var producer = new Producer<string, string>(config)) { Console.WriteLine(" -----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:"); Console.WriteLine("> key value<Enter>"); Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:"); Console.WriteLine("> value<enter>"); Console.WriteLine("Ctrl-C to quit. "); var cancelled = false; Console.CancelKeyPress += (_, e) => { e.Cancel = true; // prevent the process from terminating. cancelled = true; }; while (!cancelled) { Console.Write("> "); string text; try { text = Console.ReadLine(); } catch (IOException) { // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true. break; } if (text == null) { // Console returned null before // the CancelKeyPress was treated break; } string key = null; string val = text; // split line if both key and value specified. int index = text.IndexOf(" "); if (index != -1) { key = text.Substring(0, index); val = text.Substring(index + 1); } try { // Awaiting the asynchronous produce request below prevents flow of execution // from proceeding until the acknowledgement from the broker is received. var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val }); Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}"); } catch (KafkaException e) { Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]"); } } // Since we are producing synchronously, at this point there will be no messages // in-flight and no delivery reports waiting to be acknowledged, so there is no // need to call producer.Flush before disposing the producer. } } } }
消费者:
三.可视化工具
kafkatool: