zoukankan      html  css  js  c++  java
  • Window Kafka使用(.Net)

    一:环境搭建(windows)
    1.1 安装zookeeper
    下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
    修改系统环境变量,在Path后添加 ;D:Program_Filesapache-zookeeper-3.7.0in
    复制confzoo_sample.cfg,重命名zoo.cfg,修改zoo.cfg中的dataDir,并添加一行dataLogDir
    dataDir=D:/Program_Files/apache-zookeeper-3.7.0/data
    dataLogDir=D:/Program_Files/apache-zookeeper-3.7.0/log

    启动zkServer,cmd -> zkServer 

    2.2 安装kafka
    下载最新版kafka,http://kafka.apache.org/downloads
    添加系统环境变量,在Path后添加 ;D:Program_Fileskafka_2.13-2.8.0inwindows

    解压到指定路径,如:E:kafka_2.12-0.10.2.0
    修改kafka_2.13-2.8.0config目录下的server.properties中 log.dirs的值
    log.dirs=D:/Program_Files/kafka_2.13-2.8.0/kafka-logs

    启动kafka,cmd

    d:
    cd D:Program_Fileskafka_2.13-2.8.0
    .inwindowskafka-server-start.bat .configserver.properties

    二:测试
    运行cmd命令行,创建一个topic,命令如下:
    kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    再打开一个cmd,创建一个Producer,命令如下:
    kafka-console-producer.bat --broker-list localhost:9092 --topic test
    再打开一个cmd,创建一个Customer,命令如下:
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

    程序添加Nuget

    using Confluent.Kafka;
    using System;
    using System.Collections.Generic;
    using System.Threading;
    
    namespace KafKaNet
    {
        class Program
        {
            // localhost
            static void Main(string[] args)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(Consumer));
    
                Produce();
            }
    
            /// <summary>
            /// 生产者
            /// </summary>
            public static void Produce()
            {
                var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
    
                Action<DeliveryReport<Null, string>> handler = r =>
                    Console.WriteLine(!r.Error.IsError
                        ? $"Delivered message to {r.TopicPartitionOffset}"
                        : $"Delivery Error: {r.Error.Reason}");
    
    
                using (var p = new ProducerBuilder<Null, string>(config).Build())
                {
                    try
                    {
                        for (var i = 1; i <= 10; i++)
                        {
                            p.Produce("test", new Message<Null, string> { Value = $"my111 message: {i}" }, handler);
                        }
    
                        p.Flush(TimeSpan.FromSeconds(3)); // 超时时间
                    }
                    catch (ProduceException<Null, string> e)
                    {
                        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                    }
                }
                Console.WriteLine("Done!");
                Console.ReadKey();
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            public static void Consumer(object obj)
            {
                var conf = new ConsumerConfig
                {
                    GroupId = "test-consumer-group",
                    BootstrapServers = "localhost:9092",
                    AutoOffsetReset = AutoOffsetReset.Earliest, 
                    EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失
                };
    
                using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
                {
                    c.Subscribe("test");
                     
                    try
                    {
                        while (true)
                        {
                            try
                            { 
                                var consume = c.Consume();
                                string receiveMsg = consume.Message.Value;
                                Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                                 
                                if (true)
                                {
                                    c.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移
                                }
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Error occured: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    { 
                        c.Close();
                    }
                }
            }
        }
    }
    

    qq:505645074
  • 相关阅读:
    virtual 关键字
    innerhtml和innertext的用法以及区别
    CSS中overflow:hidden
    CSS中的repeat
    VC++6.0打开文件出错的解决办法
    HTML+CSS基础总结
    Guid算法
    SQL初级阶段笔记
    text-decoration
    IDEA 在同一目录创建多个项目
  • 原文地址:https://www.cnblogs.com/chen1880/p/14934020.html
Copyright © 2011-2022 走看看