zoukankan      html  css  js  c++  java
  • .NET Core 下使用 Kafka

    安装

    CentOS 安装 kafka

    下载并解压

    # 下载,并解压
    $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
    $ tar -zxvf  kafka_2.12-2.1.1.tgz
    $ mv kafka_2.12-2.1.1.tgz /data/kafka
    
    # 下载 zookeeper,解压
    $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
    $ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
    $ mv apache-zookeeper-3.5.8-bin /data/zookeeper
    

    启动 ZooKeeper

    # 复制配置模版
    $ cd /data/kafka/conf
    $ cp zoo_sample.cfg zoo.cfg
    
    # 看看配置需不需要改
    $ vim zoo.cfg
    
    # 命令
    $ ./bin/zkServer.sh start    # 启动
    $ ./bin/zkServer.sh status   # 状态
    $ ./bin/zkServer.sh stop     # 停止
    $ ./bin/zkServer.sh restart  # 重启
    
    # 使用客户端测试
    $ ./bin/zkCli.sh -server localhost:2181
    $ quit
    

    启动 Kafka

    # 备份配置
    $ cd /data/kafka
    $ cp config/server.properties config/server.properties_copy
    
    # 修改配置
    $ vim /data/kafka/config/server.properties
    
    # 集群配置下,每个 broker 的 id 是必须不同的
    # broker.id=0
    
    # 监听地址设置(内网)
    # listeners=PLAINTEXT://ip:9092
    
    # 对外提供服务的IP、端口
    # advertised.listeners=PLAINTEXT://106.75.84.97:9092
    
    # 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3
    # num.partitions=3
    
    # zookeeper 配置
    # zookeeper.connect=localhost:2181
    
    # 通过配置启动 kafka
    $  ./bin/kafka-server-start.sh  config/server.properties&
    
    # 状态查看
    $ ps -ef|grep kafka
    $ jps
    

    docker下安装Kafka

    docker pull wurstmeister/zookeeper
    docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
    
    docker pull wurstmeister/kafka
    docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
    

    介绍

    • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
    • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
    • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。
    • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
    • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

    kafka partition 和 consumer 数目关系

    • 如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。
    • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。
    • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
    • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

    快速开始

    在 .NET Core 项目中安装组件

    Install-Package Confluent.Kafka
    

    开源地址:https://github.com/confluentinc/confluent-kafka-dotnet

    添加IKafkaService服务接口

    public interface IKafkaService
    {
        /// <summary>
        /// 发送消息至指定主题
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="topicName"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;
    
        /// <summary>
        /// 从指定主题订阅消息
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="topics"></param>
        /// <param name="messageFunc"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
    }
    

    实现IKafkaService

    public class KafkaService : IKafkaService
    {
        public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "127.0.0.1:9092"
            };
            using var producer = new ProducerBuilder<string, string>(config).Build();
            await producer.ProduceAsync(topicName, new Message<string, string>
            {
                Key = Guid.NewGuid().ToString(),
                Value = message.SerializeToJson()
            });
        }
    
        public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "127.0.0.1:9092",
                GroupId = "crow-consumer",
                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SessionTimeoutMs = 6000,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true
            };
            //const int commitPeriod = 5;
            using var consumer = new ConsumerBuilder<Ignore, string>(config)
                                 .SetErrorHandler((_, e) =>
                                 {
                                     Console.WriteLine($"Error: {e.Reason}");
                                 })
                                 .SetStatisticsHandler((_, json) =>
                                 {
                                     Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");
                                 })
                                 .SetPartitionsAssignedHandler((c, partitions) =>
                                 {
                                     string partitionsStr = string.Join(", ", partitions);
                                     Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");
                                 })
                                 .SetPartitionsRevokedHandler((c, partitions) =>
                                 {
                                     string partitionsStr = string.Join(", ", partitions);
                                     Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");
                                 })
                                 .Build();
            consumer.Subscribe(topics);
            try
            {
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
                        if (consumeResult.IsPartitionEOF)
                        {
                            Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                            continue;
                        }
                        TMessage messageResult = null;
                        try
                        {
                            messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
                        }
                        catch (Exception ex)
                        {
                            var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
                            Console.WriteLine(errorMessage);
                            messageResult = null;
                        }
                        if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
                        {
                            messageFunc(messageResult);
                            try
                            {
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException e)
                            {
                                Console.WriteLine(e.Message);
                            }
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Closing consumer.");
                consumer.Close();
            }
            await Task.CompletedTask;
        }
    }
    

    注入IKafkaService,在需要使用的地方直接调用即可。

    public class MessageService : IMessageService, ITransientDependency
    {
        private readonly IKafkaService _kafkaService;
        public MessageService(IKafkaService kafkaService)
        {
            _kafkaService = kafkaService;
        }
    
        public async Task RequestTraceAdded(XxxEventData eventData)
        {
            await _kafkaService.PublishAsync(eventData.TopicName, eventData);
        }
    }
    

    以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。

    var cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true;
        cts.Cancel();
    };
    
    await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
    {
        // Your logic
    
        Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
    }, cts.Token);
    

    IKafkaService中已经写了订阅消息的接口,这里也是注入后直接使用即可。

    生产者消费者示例

    生产者

    static async Task Main(string[] args)
    {
        if (args.Length != 2)
        {
            Console.WriteLine("Usage: .. brokerList topicName");
            // 127.0.0.1:9092 helloTopic
            return;
        }
    
        var brokerList = args.First();
        var topicName = args.Last();
    
        var config = new ProducerConfig { BootstrapServers = brokerList };
    
        using var producer = new ProducerBuilder<string, string>(config).Build();
    
        Console.WriteLine("
    -----------------------------------------------------------------------");
        Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
        Console.WriteLine("-----------------------------------------------------------------------");
        Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");
        Console.WriteLine("> key value<Enter>");
        Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");
        Console.WriteLine("> value<enter>");
        Console.WriteLine("Ctrl-C to quit.
    ");
    
        var cancelled = false;
    
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cancelled = true;
        };
    
        while (!cancelled)
        {
            Console.Write("> ");
    
            var text = string.Empty;
    
            try
            {
                text = Console.ReadLine();
            }
            catch (IOException)
            {
                break;
            }
    
            if (string.IsNullOrWhiteSpace(text))
            {
                break;
            }
    
            var key = string.Empty;
            var val = text;
    
            var index = text.IndexOf(" ");
            if (index != -1)
            {
                key = text.Substring(0, index);
                val = text.Substring(index + 1);
            }
    
            try
            {
                var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>
                {
                    Key = key,
                    Value = val
                });
    
                Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");
            }
            catch (ProduceException<string, string> e)
            {
                Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");
            }
        }
    }
    

    消费者

    static void Main(string[] args)
    {
        if (args.Length != 2)
        {
            Console.WriteLine("Usage: .. brokerList topicName");
            // 127.0.0.1:9092 helloTopic
            return;
        }
    
        var brokerList = args.First();
        var topicName = args.Last();
    
        Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
    
        var cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) =>
        {
            e.Cancel = true;
            cts.Cancel();
        };
    
        var config = new ConsumerConfig
        {
            BootstrapServers = brokerList,
            GroupId = "consumer",
            EnableAutoCommit = false,
            StatisticsIntervalMs = 5000,
            SessionTimeoutMs = 6000,
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnablePartitionEof = true
        };
    
        const int commitPeriod = 5;
    
        using var consumer = new ConsumerBuilder<Ignore, string>(config)
                             .SetErrorHandler((_, e) =>
                             {
                                 Console.WriteLine($"Error: {e.Reason}");
                             })
                             .SetStatisticsHandler((_, json) =>
                             {
                                 Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");
                                 //Console.WriteLine($"Statistics: {json}");
                             })
                             .SetPartitionsAssignedHandler((c, partitions) =>
                             {
                                 Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
                             })
                             .SetPartitionsRevokedHandler((c, partitions) =>
                             {
                                 Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
                             })
                             .Build();
        consumer.Subscribe(topicName);
    
        try
        {
            while (true)
            {
                try
                {
                    var consumeResult = consumer.Consume(cts.Token);
    
                    if (consumeResult.IsPartitionEOF)
                    {
                        Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
    
                        continue;
                    }
    
                    Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
    
                    if (consumeResult.Offset % commitPeriod == 0)
                    {
                        try
                        {
                            consumer.Commit(consumeResult);
                        }
                        catch (KafkaException e)
                        {
                            Console.WriteLine($"Commit error: {e.Error.Reason}");
                        }
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Consume error: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Closing consumer.");
            consumer.Close();
        }
    }
    

  • 相关阅读:
    短URL
    Linux安装MySQL
    Ubuntu中安装MySQL
    安装交叉工具链arm-linux-gcc
    Linux安装—IP设置
    Linux内核概述
    Bash变量
    Shell登陆
    Linux—查看远程Linux系统运行时间
    Linux—查看路由
  • 原文地址:https://www.cnblogs.com/meowv/p/13614516.html
Copyright © 2011-2022 走看看