zoukankan      html  css  js  c++  java
  • kafka-net

    基于kafka-net实现的可以长链接的消息生产者

      今天有点时间,我就来说两句。最近接触的Kafka相关的东西要多一些,其实以前也接触过,但是在项目使用中的经验不是很多。最近公司的项目里面使用了Kafka消息中间件,由于以前的人员编写的客户端的类不是很好,没有设计的概念,就是一个简单类的功能罗列,没有考虑到后期的扩展和维护(以后可能会兼容其他形式的消息队列,要做到无缝衔接),所以这个重构的任务就落到我的身上。
    
      先说说我的感受,然后再贴出代码的实现吧。我第一次是基于Confluent.Kafka编写的Kafka消息生产者,后来经过测试,同步操作的时间比较长,要完成20万数据发送消息并更新到数据库的时间大概是16-18分钟,这个结果有点让人不能接受。为了提高性能,也做了很多测试,都没有办法解决这个问题。后来抱着试试看的想法,我又基于kafka-net重新实现了Kafka消息的生产者。经过测试,完成同样的任务,时间大概需要3分钟左右。两种实现方法完成同样的任务,都是以同步的方式生产消息,并将消息成功发送到Broker后,再将数据插入到数据库做记录。大家不要纠结为什么这样使用消息队列,这是上头的做法,我还不能做大的改动,我也无奈。
    
      目前看,基于kafka-net实现的消息生产者在生产消息并发送成功所需要的时间要比基于Confluent.Kafka实现的消息生产者的所需要的时间要少,尤其是发送的数据越多,这个时间的差距越大。具体的原因还不清楚,如果有高手可以不吝赐教。好了,我该上代码了。
    
      开始代码之前,要说明一点:Confluent.Kafka的Broker是不需要带Http://这个前缀的,但是 kafka-net 的Broker是有http://这个前缀的,大家要注意这个,刚开始的时候我也被坑了一下子。
    

    复制代码
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;

    namespace Enterprise.Framework.MessageQueue
    {
    ///


    /// 消息生产者的接口定义,所有消息生产者的实现必须继承该接口
    ///

    public interface IMessageProducer
    {
    ///
    /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    ///

    /// 发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常
    /// 需要发送的消息内容,该参数不能为空,空值会抛出异常
    void Produce(string topic, string message);
    }
    }
    复制代码
    复制代码
    1 using System;
    2 using System.Collections.Generic;
    3 using System.Linq;
    4 using System.Text;
    5 using System.Threading.Tasks;
    6
    7 namespace Enterprise.Framework.MessageQueue
    8 {
    9 ///
    10 /// Kafka消息生产者的接口定义,所有Kafka消息生产者的实现必须继承该接口
    11 ///

    12 public interface IKafkaMessageProducer : IMessageProducer
    13 {
    14 ///
    15 /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    16 ///

    17 /// 发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常
    18 /// 需要发送的消息内容,该参数不能为空,空值会抛出异常
    19 /// 当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作,默认值为空
    20 void Produce(string topic, string message, Action producedAction = null);
    21 }
    22 }
    复制代码
    复制代码
    1 using System;
    2 using System.Collections.Generic;
    3 using System.Linq;
    4 using System.Runtime.InteropServices;
    5 using System.Text;
    6
    7 namespace Enterprise.Framework.AbstractInterface
    8 {
    9 ///
    10 /// 该抽象类定义了所有需要释放资源类型的抽象类
    11 ///

    12 public abstract class DisposableBase : IDisposable
    13 {
    14 private bool disposed = false;
    15
    16 ///
    17 /// 实现IDisposable中的Dispose方法
    18 ///

    19 public void Dispose()
    20 {
    21 //必须为true
    22 Dispose(true);
    23 //通知垃圾回收机制不再调用终结器(析构器)
    24 GC.SuppressFinalize(this);
    25 }
    26
    27 ///
    28 /// 不是必要的,提供一个Close方法仅仅是为了更符合其他语言(如C++)的规范
    29 ///

    30 public void Close()
    31 {
    32 Dispose();
    33 }
    34
    35 ///
    36 /// 必须,以备程序员忘记了显式调用Dispose方法
    37 ///

    38 ~DisposableBase()
    39 {
    40 //必须为false
    41 Dispose(false);
    42 }
    43
    44 ///
    45 /// 非密封类修饰用protected virtual
    46 /// 密封类修饰用private
    47 ///

    48 /// 是否要清理托管资源,true表示需要清理托管资源,false表示不需要清理托管资源
    49 protected virtual void Dispose(bool disposing)
    50 {
    51 if (disposed)
    52 {
    53 return;
    54 }
    55 if (disposing)
    56 {
    57 // 清理托管资源
    58 DisposeManagedResources();
    59 }
    60 // 清理非托管资源
    61 DisposeUnmanagedResource();
    62 //让类型知道自己已经被释放
    63 disposed = true;
    64 }
    65
    66 ///
    67 /// 释放托管资源
    68 ///

    69 protected abstract void DisposeManagedResources();
    70
    71 ///
    72 /// 释放非托管资源
    73 ///

    74 protected abstract void DisposeUnmanagedResource();
    75 }
    76 }
    复制代码
    复制代码
    1 using KafkaNet;
    2 using KafkaNet.Model;
    3 using KafkaNet.Protocol;
    4 using System;
    5 using System.Collections.Generic;
    6 using System.IO;
    7 using System.Linq;
    8 using System.Text;
    9 using System.Threading;
    10 using System.Threading.Tasks;
    11 using ThreeSoft.Framework.AbstractInterface;
    12
    13 namespace Enterprise.Framework.MessageQueue
    14 {
    15 ///
    16 /// Kafka消息生产者具体的实现类,可以针对长链接进行消息发送处理,不用频繁进行消息组件的创建和销毁的工作
    17 ///

    18 public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer
    19 {
    20 #region 私有字段
    21
    22 private KafkaNet.Producer _producer;
    23 private BrokerRouter _brokerRouter;
    24 private string _broker;
    25
    26 #endregion
    27
    28 #region 构造函数
    29
    30 ///
    31 /// 通过构造函数初始化消息队列的服务器
    32 ///

    33 /// 消息队列服务器地址,该值不能为空
    34 public KafkaMessageKeepAliveProducer(string broker)
    35 {
    36 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker))
    37 {
    38 throw new ArgumentNullException("消息队列服务器的地址不可以为空!");
    39 }
    40
    41 #region kafka-net实现
    42
    43 Uri[] brokerUriList = null;
    44
    45 if (broker.IndexOf(',') >= 0)
    46 {
    47 string[] brokers = broker.Split(',');
    48 brokerUriList = new Uri[brokers.Length];
    49
    50 for (int i = 0; i < brokers.Length; i++)
    51 {
    52 brokerUriList[i] = new Uri(brokers[i]);
    53 }
    54 }
    55 else
    56 {
    57 brokerUriList = new Uri[] { new Uri(broker) };
    58 }
    59
    60 var kafkaOptions = new KafkaOptions(brokerUriList);
    61 _brokerRouter = new BrokerRouter(kafkaOptions);
    62 _producer = new KafkaNet.Producer(_brokerRouter);
    63
    64 #endregion
    65
    66 _broker = broker;
    67 }
    68
    69 #endregion
    70
    71 #region 实例属性
    72
    73 ///
    74 /// 获取消息服务器的地址
    75 ///

    76 public string Broker
    77 {
    78 get { return _broker; }
    79 }
    80
    81 #endregion
    82
    83 #region 发送消息的方法
    84
    85 ///
    86 /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    87 ///

    88 /// 发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常
    89 /// 需要发送的消息内容,该参数不能为空,空值会抛出异常
    90 /// 当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作
    91 public void Produce(string topic, string message, Action producedAction = null)
    92 {
    93 #region 同步实现
    94
    95 var currentDatetime = DateTime.Now;
    96 var key = currentDatetime.Second.ToString();
    97 var events = new[] { new KafkaNet.Protocol.Message(message, key) };
    98 List result = _producer.SendMessageAsync(topic, events).Result;
    99
    100 if (producedAction != null && result != null && result.Count > 0)
    101 {
    102 MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[0].Offset, Partition = result[0].PartitionId, Topic = result[0].Topic };
    103 producedAction(messageResult);
    104 }
    105
    106 #endregion
    107 }
    108
    109 ///
    110 /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    111 ///

    112 /// 发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常
    113 /// 需要发送的消息内容,该参数不能为空,空值会抛出异常
    114 public void Produce(string topic, string message)
    115 {
    116 Produce(topic, message, null);
    117 }
    118
    119 #endregion
    120
    121 #region 实现消息队列资源的释放
    122
    123 ///
    124 /// 析构函数释放资源
    125 ///

    126 ~KafkaMessageKeepAliveProducer()
    127 {
    128 Dispose(false);
    129 }
    130
    131 ///
    132 /// 释放托管资源
    133 ///

    134 protected override void DisposeManagedResources()
    135 {
    136 if (_producer != null)
    137 {
    138 _producer.Dispose();
    139 }
    140 if (_brokerRouter != null)
    141 {
    142 _brokerRouter.Dispose();
    143 }
    144 }
    145
    146 ///
    147 /// 释放非托管资源
    148 ///

    149 protected override void DisposeUnmanagedResource(){}
    150
    151 #endregion
    152 }
    153 }
    复制代码

    好了,今天就写到这里了,每天进步一点点,努力坚持。不忘初心,继续努力吧,欢迎大家前来讨论。
    

    天下国家,可均也;爵禄,可辞也;白刃,可蹈也;中庸不可能也
    分类: 消息队列

  • 相关阅读:
    第二十次CSP考试有感
    chan数据结构实现原理
    记一次udp端口数据流过程
    Envoy 部署类型
    后K8S时代的微服务
    ESP32-使用有刷直流电机笔记
    ESP32-使用ADC笔记
    网络安全黑白名单设置
    网络安全并发数限制与连接频率限制
    apache与nginx服务器启用https功能
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/9433992.html
Copyright © 2011-2022 走看看