zoukankan      html  css  js  c++  java
  • c# .net 使用Confluent.Kafka针对kafka进行生产和消费

    首先说明一点,像Confluent.Kafka这种开源的组件,三天两头的更新。在搜索引擎搜索到的结果往往用不了,浪费时间。建议以后遇到类似的情况直接看官网给的Demo。

    因为搜索引擎搜到的文章,作者基本上都没有说明用的是哪个版本的dll。所以你nuget安装了后,不一定能使用。

    截止目前,我用的Confluent.Kafka是最新版本:1.2.1

    GitHub上源码地址:https://github.com/confluentinc/confluent-kafka-dotnet,上面附有生产和消费的示例。直接去看吧。往下就不要看了,是我自己用到的,只是方便我自己查看。

    生产:

            static async void Produce()
            {
                var config = new ProducerConfig { BootstrapServers = "192.168.3.250:9092" };
                
                using (var p = new ProducerBuilder<Null, string>(config).Build())
                {
                    try
                    {
                        var dr = await p.ProduceAsync("mytopic", new Message<Null, string> { Value = "test" });
                        Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                    }
                    catch (ProduceException<Null, string> e)
                    {
                        Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                    }
                }
            }

    消费:

           static async void Consume()
            {
                var conf = new ConsumerConfig
                {
                    GroupId = "test-consumer-group",
                    BootstrapServers = "192.168.3.250:9092",
                    AutoOffsetReset = AutoOffsetReset.Earliest
                };
    
                using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
                {
                    c.Subscribe("mytopic");
    
                    CancellationTokenSource cts = new CancellationTokenSource();
                    Console.CancelKeyPress += (_, e) => {
                        e.Cancel = true; // prevent the process from terminating.
                        cts.Cancel();
                    };
    
                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var cr = c.Consume(cts.Token);
                                Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Error occured: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        c.Close();
                    }
                }
            }
  • 相关阅读:
    知识管理系统
    小强地狱(Bug Hell)——优先级和缺陷修改的平衡
    搜索引擎中用到的一些拆词方式解析
    TPLINK TLWR841N 路由变无线交换机设置
    .iso与.mdx(mds)格式的区别
    关于“小米盒子”等的被喷
    刚开通了博客园,以后就在这里安家吧
    温故而知新,算法在我心
    silverlight应用图片新闻展示效果
    jquery应用实现博客个性主页布局拖拽功能
  • 原文地址:https://www.cnblogs.com/subendong/p/11826274.html
Copyright © 2011-2022 走看看