zoukankan      html  css  js  c++  java
  • c# .net 使用Confluent.Kafka针对kafka进行生产和消费

    首先说明一点,像Confluent.Kafka这种开源的组件,三天两头的更新。在搜索引擎搜索到的结果往往用不了,浪费时间。建议以后遇到类似的情况直接看官网给的Demo。

    因为搜索引擎搜到的文章,作者基本上都没有说明用的是哪个版本的dll。所以你nuget安装了后,不一定能使用。

    截止目前,我用的Confluent.Kafka是最新版本:1.2.1

    GitHub上源码地址:https://github.com/confluentinc/confluent-kafka-dotnet,上面附有生产和消费的示例。直接去看吧。往下就不要看了,是我自己用到的,只是方便我自己查看。

    生产:

            static async void Produce()
            {
                var config = new ProducerConfig { BootstrapServers = "192.168.3.250:9092" };
                
                using (var p = new ProducerBuilder<Null, string>(config).Build())
                {
                    try
                    {
                        var dr = await p.ProduceAsync("mytopic", new Message<Null, string> { Value = "test" });
                        Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                    }
                    catch (ProduceException<Null, string> e)
                    {
                        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                    }
                }
            }

    消费:

           static async void Consume()
            {
                var conf = new ConsumerConfig
                {
                    GroupId = "test-consumer-group",
                    BootstrapServers = "192.168.3.250:9092",
                    AutoOffsetReset = AutoOffsetReset.Earliest
                };
    
                using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
                {
                    c.Subscribe("mytopic");
    
                    CancellationTokenSource cts = new CancellationTokenSource();
                    Console.CancelKeyPress += (_, e) => {
                        e.Cancel = true; // prevent the process from terminating.
                        cts.Cancel();
                    };
    
                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var cr = c.Consume(cts.Token);
                                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Error occured: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        c.Close();
                    }
                }
            }
  • 相关阅读:
    Seafile和Nextcloud相比较哪个好用
    opencv3.1+cmake+mingw5.3+QT5编译
    算法导论第三版--动态规划与贪心算法
    Linux 网卡特性配置ethtool详解
    算法导论第三版--红黑树
    算法导论第三版--二叉搜索树
    realloc在aarch64_be-gcc的奇怪表现
    算法导论第三版--桶排序
    算法导论第三版--计数,基数排序
    算法导论第三版--插入排序和归并排序
  • 原文地址:https://www.cnblogs.com/subendong/p/11826274.html
Copyright © 2011-2022 走看看