zoukankan      html  css  js  c++  java
  • .net中使用kafka(win)

    一.安装运行环境


    需要安装:jdk,zookeeper,kafka

    https://www.cnblogs.com/bibi-feiniaoyuan/p/9539343.html

     启动zookeeper:

    bin目录下cmd执行:    zkServer   

    启动kafka:

    安装目录下cmd执行:  .inwindowskafka-server-start.bat .configserver.properties

    查看topic:  .inwindowskafka-topics.bat --list --zookeeper localhost:2181

    常见错误:

    1.

    修改 binwindows目录中的kafka-run-class.bat:找到set COMMAND,在 %CLASSPATH% 前后加上双引号 “%CLASSPATH%“

    https://blog.csdn.net/u010513487/article/details/79483860

     2.

    重启kafka时报错:

    1.删除kafka kafka-logs 文件夹下的日志
    2.删除zookeeper tmp文件夹下的日志
    重启 zookeeper,kafka

    二.vs中使用


    nuget中安装:

    生产者:

    // Copyright 2016-2017 Confluent Inc., 2015-2016 Andreas Heider
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    // http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    //
    // Derived from: rdkafka-dotnet, licensed under the 2-clause BSD License.
    //
    // Refer to LICENSE for more information.
    
    using System;
    using System.IO;
    using System.Text;
    using System.Threading.Tasks;
    using System.Collections.Generic;
    using Confluent.Kafka;
    
    
    namespace Confluent.Kafka.Examples.Producer
    {
        public class Program
        {
            public static async Task Main(string[] args)
            {
                if (args.Length != 2)
                {
                    Console.WriteLine("Usage: .. brokerList topicName");
                    return;
                }
    
                string brokerList = args[0];
                string topicName = args[1];
    
                var config = new ProducerConfig { BootstrapServers = brokerList };
    
                using (var producer = new Producer<string, string>(config))
                {
                    Console.WriteLine("
    -----------------------------------------------------------------------");
                    Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
                    Console.WriteLine("-----------------------------------------------------------------------");
                    Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
                    Console.WriteLine("> key value<Enter>");
                    Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
                    Console.WriteLine("> value<enter>");
                    Console.WriteLine("Ctrl-C to quit.
    ");
    
                    var cancelled = false;
                    Console.CancelKeyPress += (_, e) => {
                        e.Cancel = true; // prevent the process from terminating.
                        cancelled = true;
                    };
    
                    while (!cancelled)
                    {
                        Console.Write("> ");
    
                        string text;
                        try
                        {
                            text = Console.ReadLine();
                        }
                        catch (IOException)
                        {
                            // IO exception is thrown when ConsoleCancelEventArgs.Cancel == true.
                            break;
                        }
                        if (text == null)
                        {
                            // Console returned null before 
                            // the CancelKeyPress was treated
                            break;
                        }
    
                        string key = null;
                        string val = text;
    
                        // split line if both key and value specified.
                        int index = text.IndexOf(" ");
                        if (index != -1)
                        {
                            key = text.Substring(0, index);
                            val = text.Substring(index + 1);
                        }
    
                        try
                        {
                            // Awaiting the asynchronous produce request below prevents flow of execution
                            // from proceeding until the acknowledgement from the broker is received.
                            var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val });
                            Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
                        }
                        catch (KafkaException e)
                        {
                            Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                        }
                    }
    
                    // Since we are producing synchronously, at this point there will be no messages
                    // in-flight and no delivery reports waiting to be acknowledged, so there is no
                    // need to call producer.Flush before disposing the producer.
                }
            }
        }
    }
    View Code

     消费者:

     三.可视化工具

    kafkatool:

  • 相关阅读:
    换上 SansForgetica-Regular 字体,增加记忆能力
    Windows和Linux查看端口占用
    安卓打开远程调试(免root)
    debian系统解决包依赖问题的神器aptitude
    C# WinForm 实现窗体淡入淡出
    [图文教程]VS2017搭建opencv & C++ 开发环境
    C# 调用Tesseract实现OCR
    数据库工具链接阿里云MySQL数据库
    【转载】如何选择MySQL存储引擎
    java Long、Integer 、Double、Boolean类型 不能直接比较
  • 原文地址:https://www.cnblogs.com/Linky008/p/9910392.html
Copyright © 2011-2022 走看看