zoukankan      html  css  js  c++  java
  • .Net(c#)使用 Kafka 小结

    .Net(c#)使用 Kafka 小结

    1.开篇

    由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。

    1.1.kafka 部署

    kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。

    1.1.1.确保安装了 docker 和 docker-compose

    网上很多教程,安装也简单,不作为重点赘述。

    1.1.2.编写 docker-compose.yml

    将以下内容直接复制到新建空文件docker-compose.yml中。

    version: "3"
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka
        depends_on: [zookeeper]
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_HOST_NAME: localhost
          KAFKA_CREATE_TOPICS: "test"
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        volumes:
          - /var/run/docker.sock:/var/run/docker.sock
    

    1.1.3.容器构建提交

    docker-compose.yml文件的目录下执行以下命令:

    docker-compose build # 打包
    docker-compose up # 启动, 添加 -d 可以后台启动。
    

    看到日志输出:

    Creating network "desktop_default" with the default driver
    Creating desktop_zookeeper_1 ... done
    Creating desktop_kafka_1     ... done
    Attaching to desktop_zookeeper_1, desktop_kafka_1
    zookeeper_1  | ZooKeeper JMX enabled by default
    zookeeper_1  | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
    zookeeper_1  | 2020-05-17 03:34:31,794 [myid:] - INFO  [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
    ...
    zookeeper_1  | 2020-05-17 03:34:31,872 [myid:] - INFO  [main:ZooKeeperServer@836] - tickTime set to 2000
    ...
    kafka_1      | Excluding KAFKA_VERSION from broker config
    

    没有错误输出说明部署成功。

    2.kafka 客户端选择

    在 github 上能够找到好几个 c#可以使用的 kafka 客户端。大家可以去搜一下,本文就只说明rdkafka-dotnetconfluent-kafka-dotnet

    2.1.rdkafka-dotnet

    我们生产环境中就使用的该客户端。在该项目 github 首页中可以看到:

    var config = new Config() { GroupId = "example-csharp-consumer" };
    using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
    {
        consumer.OnMessage += (obj, msg) =>
        {
            //...
        };
    }
    

    没错,使用它的原因就是它提供了EventConsumer,可以直接异步订阅消息。整体上来说该客户端非常的稳定,性能优良。使用过程中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是通过字典配置的,比如:

    var config = new Config();
    config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最早
    

    这对于新手来说并不是很友好,很难想到去这样配置。当然如果有 librdkafka 的使用经验会好很多。大多数配置在 librdkafka 项目的CONFIGURATION

    还有一个需要注意的是 Broker 的版本支持Broker version support: >=0.8,也在 librdkafka 项目中可以找到。

    2.2 confluent-kafka-dotnet

    confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,因为配置相对友好,更加全面。比如:

    var conf = new ConsumerConfig
    {
        AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置
    };
    

    对于 EventConsumer 怎么办呢?在项目变更记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,我们可以参照基项目写一个:

    public class EventConsumer<TKey, TValue> : IDisposable
    {
        private Task _consumerTask;
        private CancellationTokenSource _consumerCts;
        public IConsumer<TKey, TValue> Consumer { get; }
        public ConsumerBuilder<TKey, TValue> Builder { get; set; }
        public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)
        {
            Builder = new ConsumerBuilder<TKey, TValue>(config);
            Consumer = Builder.Build();
        }
        public event EventHandler<ConsumeResult<TKey, TValue>> OnConsumeResult;
        public event EventHandler<ConsumeException> OnConsumeException;
        public void Start()
        {
            if (Consumer.Subscription?.Any() != true)
            {
                throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
            }
            if (_consumerTask != null)
            {
                return;
            }
            _consumerCts = new CancellationTokenSource();
            var ct = _consumerCts.Token;
            _consumerTask = Task.Factory.StartNew(() =>
            {
                while (!ct.IsCancellationRequested)
                {
                    try
                    {
                        var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
                        if (cr == null) continue;
                        OnConsumeResult?.Invoke(this, cr);
                    }
                    catch (ConsumeException e)
                    {
                        OnConsumeException?.Invoke(this, e);
                    }
                }
            }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
        public async Task Stop()
        {
            if (_consumerCts == null || _consumerTask == null) return;
            _consumerCts.Cancel();
            try
            {
                await _consumerTask;
            }
            finally
            {
                _consumerTask = null;
                _consumerCts = null;
            }
        }
        public void Dispose()
        {
            if (_consumerTask != null)
            {
                Stop().Wait();
            }
            Consumer?.Dispose();
        }
    }
    

    使用测试:

    static async Task Main(string[] args)
    {
        Console.WriteLine("Hello World!");
        var conf = new ConsumerConfig
        {
            GroupId = "test-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest,
        };
        var eventConsumer = new EventConsumer<Ignore, string>(conf);
        eventConsumer.Consumer.Subscribe(new[] {"test"});
        eventConsumer.OnConsumeResult += (sen, cr) =>
        {
            Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'");
        };
        do
        {
            var line = Console.ReadLine();
            switch (line)
            {
                case "stop":
                    eventConsumer.Stop();
                    break;
                case "start":
                    eventConsumer.Start();
                    break;
            }
        } while (true);
    }
    

    3.功能扩展

    !!!以下讨论都是对confluent-kafka-dotnet。

    由于用户终端也使用了 kafka 客户端订阅消息。如果终端长时间没有上线,并且消息过期时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理所有消息,服务端才适合这个角色(:。所以客户端只需每次从登录时的最新点开始读取就可以了,历史性统计就交给服务器去做。

    最便捷的方法是每次客户端连接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(如果终端很多m个,并且终端断开连接重连的次数也会很多随机n次,那么也是m*n个group信息),势必对服务端性能造成影响。

    另一种方法是在保持groupid不变的情况下,修改消费偏移。那如何去设置位置偏移为最新点呢?

    3.1 错误思路 AutoOffsetReset

    在配置中存在一个让新手容易产生误解的配置项AutoOffsetReset.Latest自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次创建GroupId时会起作用,当 groupid 已经存在 kafka 记录中时它就不管用了。

    3.2 提交偏移 Commit

    我们能够在IConsumer<TKey, TValue>中找到该 commit 方法,它有三个重载:

    1. 无参函数。就是提交当前客户端`IConsumer<TKey, TValue>.Assignment`记录的偏移。
    2. 参数ConsumeResult<TKey, TValue>。一次仅提交一个偏移。当然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。
    3. 参数IEnumerable<TopicPartitionOffset> offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。
    

    第三个函数就是我们想要的,我们只需得到对应参数TopicPartitionOffset的值就可以。

    3.2.1.TopicPartition的获取

    topic 是我们唯一可以确定的。在IConsumer<TKey, TValue>.Assignment中可以得到 topic 和 partition。但遗憾的是它只有不会立即有值。我们只能主动去服务端获取,在IAdminClient中找到了可获取该信息的方法,所以我们做一扩展:

    public static IEnumerable<TopicPartition> GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout)
    {
        using var adv = new AdminClientBuilder(config).Build();
        var topPns = adv.GetTopicPartition(topic, timeout);
        return topPns;
    }
    
    public static IEnumerable<TopicPartition> GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout)
    {
        var mta = client.GetMetadata(timeout);
        var topicPartitions = mta.Topics
            .Where(t => topic == t.Topic)
            .SelectMany(t => t.Partitions.Select(tt => new TopicPartition(t.Topic, tt.PartitionId)))
            .ToList();
        return topicPartitions;
    }
    

    3.2.2. TopicPartitionOffset获取

    我们还差 offset 的值,通过IConsumer<TKey, TValue>.QueryWatermarkOffsets方法可以查到当前水位,而其中 High 水位就是最新偏移。

    现在我们可以完成我们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,但是在此之前的卡顿和类似与内存溢出让人不得心安。Commit 还是消费了所有消息:(,只不过暗搓搓的进行。在所有消息消费期间读取所有未消费,然后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法一样接受参数的方法Assign,一试果然灵验。

    public static void AssignOffsetToHighWatermark<TKey, TValue>(this IConsumer<TKey, TValue> consumer, TopicPartition partition, TimeSpan timeout)
    {
        var water = consumer.QueryWatermarkOffsets(partition, timeout);
        if (water == null || water.High == 0) return;
        var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);
        consumer.Assign(offset);
    }
    

    3.2.3.实际使用

    最终的使用示例:

    //...
    var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));
    topicPartitions?.ToList().ForEach(t =>
    {
        eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));
    });
    eventConsumer.Start();//在消费事件开始之前就可以进行偏移设置
    //...
    

    请注意,如果您关闭了自动提交功能,并且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。

    4.总结

    这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者需要对消费的消息进行提交。事实上,每个消息体里都有偏移信息。不提交对于服务端来说就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有异曲同工之妙。

    服务端保存着每一个 groupid 对应的已经提交偏移Committed Offset。当然客户端不提交它是不会变更的(不考虑直接操作服务端的形式)。

    客户端保存自己的当前偏移Current Offset,可以通过AssignCommit进行更改,二者区别是Commit将连同提交到服务端对应的偏移中进行更改,而Assign仅改变客户端偏移,这一更改记录在IConsumer<TKey, TValue>.Assignment中,首次启动时客户端异步向服务端请求Committed Offset来对其赋值。这就是在 3.2 节中我们没有立即得到该值的的原因,该值将在可能在几秒中后被赋值,所以写了一个主动获取的方法GetTopicPartition。客户端下一次消费将根据IConsumer<TKey, TValue>.Assignment进行。

    使用AdminClientBuilder.GetMetadata函数可以得到对应话题的元数据,包括:topic、partition、Brokers 等。

    使用IConsumer<TKey, TValue>.QueryWatermarkOffsets函数可以得到当前服务端的水位,low 为最早的偏移(可能不是 0,考虑消息过期被删除的情况),high 为最新的偏移。

  • 相关阅读:
    T-SQL查询语句
    数据库和表的管理
    数据库概念
    IRF2实验
    IFR2笔记
    校园网双网出口实验案例
    双机热备实验
    华为H3C(NAT)实验
    BGP(边界网关协议)实验
    Hybrid实验
  • 原文地址:https://www.cnblogs.com/hsxian/p/12907542.html
Copyright © 2011-2022 走看看