zoukankan      html  css  js  c++  java
  • kafka

    上个章节我们讲了kafka的环境安装(这里),现在主要来了解下Kafka使用,基于.net实现kafka的消息队列应用,本文用的是Confluent.Kafka,版本0.11.6

    1、安装:

    在NuGet程序包中搜索“Confluent.Kafka”下载安装即可

    2、producer发送消息:

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using Confluent.Kafka;
     5 using Confluent.Kafka.Serialization;
     6 
     7 namespace KafKa
     8 {
     9     /// <summary>
    10     /// Kafka消息生产者
    11     /// </summary>
    12     public sealed class KafkaProducer
    13     {
    14         /// <summary>
    15         /// 生产消息并发送消息
    16         /// </summary>
    17         /// <param name="broker">kafka的服务器地址</param>
    18         /// <param name="topic">kafka的消息主题名称</param>
    19         /// <param name="partion">分区</param>
    20         /// <param name="message">需要传送的消息</param>
    21         public bool Produce(string broker, string topic, int partion, string message)
    22         {
    23             bool result = false;
    24             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
    25             {
    26                 throw new ArgumentNullException("Kafka消息服务器地址不能为空!");
    27             }
    28 
    29             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
    30             {
    31                 throw new ArgumentNullException("消息所属的主题不能为空!");
    32             }
    33 
    34             if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0)
    35             {
    36                 throw new ArgumentNullException("消息内容不能为空!");
    37             }
    38 
    39             var config = new Dictionary<string, object>
    40             {
    41                 { "bootstrap.servers", broker }
    42             };
    43             using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
    44             {
    45                 var deliveryReport = producer.ProduceAsync(topic, null, message, partion);
    46                 deliveryReport.ContinueWith(task =>
    47                 {
    48                     if (task.Result.Error.Code == ErrorCode.NoError)
    49                     {
    50                         result = true;
    51                     }
    52                     //可以在控制台使用以下语句
    53                     //Console.WriteLine("Producer:" + producer.Name + "
    Topic:" + topic + "
    Partition:" + task.Result.Partition + "
    Offset:" + task.Result.Offset + "
    Message:" + task.Result.Value);
    54                 });
    55 
    56                 producer.Flush(TimeSpan.FromSeconds(10));
    57             }
    58             return result;
    59         }
    60     }
    61 }
    View Code

    3、consumer接收消息:

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Text;
      4 using System.Threading;
      5 using Confluent.Kafka;
      6 using Confluent.Kafka.Serialization;
      7 
      8 namespace KafKa
      9 {
     10     /// <summary>
     11     /// Kafka消息消费者
     12     /// </summary>
     13     public sealed class KafkaConsumer
     14     {
     15         #region 私有字段
     16 
     17         private bool isCancelled;
     18 
     19         #endregion
     20 
     21         #region 构造函数
     22 
     23         /// <summary>
     24         /// 构造函数,初始化IsCancelled属性
     25         /// </summary>
     26         public KafkaConsumer()
     27         {
     28             isCancelled = false;
     29         }
     30 
     31         #endregion
     32 
     33         #region 属性
     34 
     35         /// <summary>
     36         /// 是否应该取消继续消费Kafka的消息,默认值是false,继续消费消息
     37         /// </summary>
     38         public bool IsCancelled
     39         {
     40             get { return isCancelled; }
     41             set { isCancelled = value; }
     42         }
     43 
     44         #endregion
     45 
     46         #region 同步版本
     47 
     48         /// <summary>
     49         /// 指定的组别的消费者开始消费指定主题的消息
     50         /// </summary>
     51         /// <param name="broker">Kafka消息服务器的地址</param>
     52         /// <param name="topic">Kafka消息所属的主题</param>
     53         /// <param name="groupID">Kafka消费者所属的组别</param>
     54         /// <param name="action">可以对已经消费的消息进行相关处理</param>
     55         public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
     56         {
     57             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
     58             {
     59                 throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
     60             }
     61 
     62             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
     63             {
     64                 throw new ArgumentNullException("消息所属的主题不能为空!");
     65             }
     66 
     67             if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
     68             {
     69                 throw new ArgumentNullException("用户分组ID不能为空!");
     70             }
     71 
     72             var config = new Dictionary<string, object>
     73                 {
     74                     { "bootstrap.servers", broker },
     75                     { "group.id", groupID },
     76                     { "enable.auto.commit", true },  // this is the default
     77                     { "auto.commit.interval.ms", 5000 },
     78                     { "statistics.interval.ms", 60000 },
     79                     { "session.timeout.ms", 6000 },
     80                     { "auto.offset.reset", "smallest" }
     81                 };
     82 
     83 
     84             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
     85             {
     86                 if (action != null)
     87                 {
     88                     consumer.OnMessage += (_, message) => {
     89                         ConsumerResult messageResult = new ConsumerResult();
     90                         messageResult.Broker = broker;
     91                         messageResult.Topic = message.Topic;
     92                         messageResult.Partition = message.Partition;
     93                         messageResult.Offset = message.Offset.Value;
     94                         messageResult.Message = message.Value;
     95 
     96                         //执行外界自定义的方法
     97                         action(messageResult);
     98                     };
     99                 }
    100 
    101                 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset);
    102 
    103                 consumer.OnError += (_, error) => Console.WriteLine("Error:" + error);
    104 
    105                 //引发反序列化错误或消费消息出现错误!= NoError。
    106                 consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error);
    107 
    108                 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets);
    109 
    110                 // 当消费者被分配一组新的分区时引发。
    111                 consumer.OnPartitionsAssigned += (_, partitions) =>
    112                 {
    113                     Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId);
    114                     //如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你为它添加了事件处理程序,你必须明确地调用.Assign以便消费者开始消费消息。
    115                     consumer.Assign(partitions);
    116                 };
    117 
    118                 // Raised when the consumer's current assignment set has been revoked.
    119                 //当消费者的当前任务集已被撤销时引发。
    120                 consumer.OnPartitionsRevoked += (_, partitions) =>
    121                 {
    122                     Console.WriteLine("Revoked Partitions:" + partitions);
    123                     // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions.
    124                     //如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你为它增加了事件处理程序,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。
    125                     consumer.Unassign();
    126                 };
    127 
    128                 //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json);
    129 
    130                 consumer.Subscribe(topic);
    131 
    132                 //Console.WriteLine("Subscribed to:" + consumer.Subscription);
    133 
    134                 while (!IsCancelled)
    135                 {
    136                     consumer.Poll(TimeSpan.FromMilliseconds(100));
    137                 }
    138             }
    139         }
    140 
    141         #endregion
    142 
    143         #region 异步版本
    144 
    145         /// <summary>
    146         /// 指定的组别的消费者开始消费指定主题的消息
    147         /// </summary>
    148         /// <param name="broker">Kafka消息服务器的地址</param>
    149         /// <param name="topic">Kafka消息所属的主题</param>
    150         /// <param name="groupID">Kafka消费者所属的组别</param>
    151         /// <param name="action">可以对已经消费的消息进行相关处理</param>
    152         public void ConsumeAsync(string broker, string topic, string groupID, Action<ConsumerResult> action = null)
    153         {
    154             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0)
    155             {
    156                 throw new ArgumentNullException("Kafka消息服务器的地址不能为空!");
    157             }
    158 
    159             if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0)
    160             {
    161                 throw new ArgumentNullException("消息所属的主题不能为空!");
    162             }
    163 
    164             if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0)
    165             {
    166                 throw new ArgumentNullException("用户分组ID不能为空!");
    167             }
    168 
    169             ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Action = action });
    170         }
    171 
    172         #endregion
    173 
    174         #region 两种提交Offsets的版本
    175 
    176         /// <summary>
    177         /// Kafka消息队列服务器自动提交offset
    178         /// </summary>
    179         /// <param name="state">消息消费者信息</param>
    180         private void KafkaAutoCommittedOffsets(object state)
    181         {
    182             ConsumerSetting setting = state as ConsumerSetting;
    183 
    184             var config = new Dictionary<string, object>
    185                 {
    186                     { "bootstrap.servers", setting.Broker },
    187                     { "group.id", setting.GroupID },
    188                     { "enable.auto.commit", true },  // this is the default
    189                     { "auto.commit.interval.ms", 5000 },
    190                     { "statistics.interval.ms", 60000 },
    191                     { "session.timeout.ms", 6000 },
    192                     { "auto.offset.reset", "smallest" }
    193                 };
    194 
    195             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
    196             {
    197                 if (setting.Action != null)
    198                 {
    199                     consumer.OnMessage += (_, message) =>
    200                     {
    201                         ConsumerResult messageResult = new ConsumerResult();
    202                         messageResult.Broker = setting.Broker;
    203                         messageResult.Topic = message.Topic;
    204                         messageResult.Partition = message.Partition;
    205                         messageResult.Offset = message.Offset.Value;
    206                         messageResult.Message = message.Value;
    207 
    208                         //执行外界自定义的方法
    209                         setting.Action(messageResult);
    210                     };
    211                 }
    212 
    213                 //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}");
    214 
    215                 //可以写日志
    216                 //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error);
    217 
    218                 //可以写日志
    219                 //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
    220 
    221                 consumer.Subscribe(setting.Topic);
    222 
    223                 while (!IsCancelled)
    224                 {
    225                     consumer.Poll(TimeSpan.FromMilliseconds(100));
    226                 }
    227             }
    228         }
    229 
    230         /// <summary>
    231         /// Kafka消息队列服务器手动提交offset
    232         /// </summary>
    233         /// <param name="state">消息消费者信息</param>
    234         private void KafkaManuallyCommittedOffsets(object state)
    235         {
    236             ConsumerSetting setting = state as ConsumerSetting;
    237 
    238             var config = new Dictionary<string, object>
    239                 {
    240                     { "bootstrap.servers", setting.Broker },
    241                     { "group.id", setting.GroupID },
    242                     { "enable.auto.commit", false },//不是自动提交的
    243                     { "auto.commit.interval.ms", 5000 },
    244                     { "statistics.interval.ms", 60000 },
    245                     { "session.timeout.ms", 6000 },
    246                     { "auto.offset.reset", "smallest" }
    247                 };
    248 
    249             using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
    250             {
    251                 //可以写日志
    252                 //consumer.OnError += (_, error) => Console.WriteLine("Error:"+error);
    253 
    254                 //可以写日志
    255                 // Raised on deserialization errors or when a consumed message has an error != NoError.
    256                 //consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error:"+error);
    257 
    258                 consumer.Subscribe(setting.Topic);
    259 
    260                 Message<Ignore, string> message = null;
    261 
    262                 while (!isCancelled)
    263                 {
    264                     if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100)))
    265                     {
    266                         continue;
    267                     }
    268 
    269                     if (setting.Action != null)
    270                     {
    271                         ConsumerResult messageResult = new ConsumerResult();
    272                         messageResult.Broker = setting.Broker;
    273                         messageResult.Topic = message.Topic;
    274                         messageResult.Partition = message.Partition;
    275                         messageResult.Offset = message.Offset.Value;
    276                         messageResult.Message = message.Value;
    277 
    278                         //执行外界自定义的方法
    279                         setting.Action(messageResult);
    280                     }
    281 
    282                     if (message.Offset % 5 == 0)
    283                     {
    284                         var committedOffsets = consumer.CommitAsync(message).Result;
    285                         //Console.WriteLine("Committed offset:"+committedOffsets);
    286                     }
    287                 }
    288             }
    289         }
    290 
    291         #endregion
    292     }
    293 }
    View Code

    4、新建Producer控制台发送消息

     1 using System;
     2 using KafKa;
     3 
     4 namespace ConsoleProducer
     5 {
     6     class Program
     7     {
     8         static void Main(string[] args)
     9         {
    10             while (true)
    11             {
    12                 var message = Console.ReadLine();
    13                 var producer = new KafkaProducer();
    14                 producer.Produce("localhost:9092", "test", 0, message);
    15             }
    16 
    17             Console.ReadKey();
    18         }
    19     }
    20 }
    View Code

      

    5、新建Consumer控制台接收消息

     1 using System;
     2 using System.Collections.Generic;
     3 using KafKa;
     4 
     5 namespace ConsoleConsumer
     6 {
     7     class Program
     8     {
     9         static void Main(string[] args)
    10         {
    11             var dts = new List<TimeSpan>();
    12 
    13             var consumer = new KafkaConsumer();
    14             consumer.ConsumeAsync("localhost:9092", "test", "0", result =>
    15               {
    16                   Console.WriteLine(result.Message);
    17               });
    18 
    19             Console.ReadKey();
    20         }
    21     }
    22 }
    View Code

    通过以上步骤运行producer控制台,发送消息回车,在consumer控制台就可以接收到消息了。

    那么我们如何通过多个consumer来消费消息呢,kafka默认采用的是range分配方法,即平均分配分区。首先注意在创建topic的命令行时创建多个分区(--partitions 5),这里我们创建了5个分区,在发送消息时选择不同的分区发送(0-5),打开5个consumer控制台(注意要同一个分组),我们会发现5个consumer会分别消费对应分区的消息

  • 相关阅读:
    BMC手册—具体工作内容。——在Agent中修改配置文件添加ping监控
    BMC手册—具体工作内容。——在PATROL Central控制台中添加Linux监控oracle的添加
    jetbrains 系列博客https://zhile.io/
    NavicatPremium15破解方法
    fastjson转换json字符串key的首字母小写变大写的解决办法
    Linux系统时间同步方法小结
    java同步/设置Linux系统时间
    Java代码获取NTP服务器时间
    IDEA2019 3.3 IDEA缓存和浏览缓存清除和设置
    一张图彻底搞懂MySQL的 explain
  • 原文地址:https://www.cnblogs.com/xxinwen/p/10687636.html
Copyright © 2011-2022 走看看