zoukankan      html  css  js  c++  java
  • 基于kafka-net实现的可以长链接的消息生产者

          今天有点时间,我就来说两句。最近接触的Kafka相关的东西要多一些,其实以前也接触过,但是在项目使用中的经验不是很多。最近公司的项目里面使用了Kafka消息中间件,由于以前的人员编写的客户端的类不是很好,没有设计的概念,就是一个简单类的功能罗列,没有考虑到后期的扩展和维护(以后可能会兼容其他形式的消息队列,要做到无缝衔接),所以这个重构的任务就落到我的身上。

          先说说我的感受,然后再贴出代码的实现吧。我第一次是基于Confluent.Kafka编写的Kafka消息生产者,后来经过测试,同步操作的时间比较长,要完成20万数据发送消息并更新到数据库的时间大概是16-18分钟,这个结果有点让人不能接受。为了提高性能,也做了很多测试,都没有办法解决这个问题。后来抱着试试看的想法,我又基于kafka-net重新实现了Kafka消息的生产者。经过测试,完成同样的任务,时间大概需要3分钟左右。两种实现方法完成同样的任务,都是以同步的方式生产消息,并将消息成功发送到Broker后,再将数据插入到数据库做记录。大家不要纠结为什么这样使用消息队列,这是上头的做法,我还不能做大的改动,我也无奈。

          目前看,基于kafka-net实现的消息生产者在生产消息并发送成功所需要的时间要比基于Confluent.Kafka实现的消息生产者的所需要的时间要少,尤其是发送的数据越多,这个时间的差距越大。具体的原因还不清楚,如果有高手可以不吝赐教。好了,我该上代码了。

          开始代码之前,要说明一点:Confluent.Kafka的Broker是不需要带Http://这个前缀的,但是 kafka-net 的Broker是有http://这个前缀的,大家要注意这个,刚开始的时候我也被坑了一下子。
       

    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace Enterprise.Framework.MessageQueue
    {
        /// <summary>
        /// 消息生产者的接口定义,所有消息生产者的实现必须继承该接口
        /// </summary>
        public interface IMessageProducer
        {
            /// <summary>
            /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
            /// </summary>
            /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
            /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>        
            void Produce(string topic, string message);
        }
    }
     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Text;
     5 using System.Threading.Tasks;
     6 
     7 namespace Enterprise.Framework.MessageQueue
     8 {
     9     /// <summary>
    10     /// Kafka消息生产者的接口定义,所有Kafka消息生产者的实现必须继承该接口
    11     /// </summary>
    12     public interface IKafkaMessageProducer : IMessageProducer
    13     {
    14         /// <summary>
    15         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    16         /// </summary>
    17         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
    18         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
    19         /// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作,默认值为空</param>
    20         void Produce(string topic, string message, Action<MessageResult> producedAction = null);
    21     }
    22 }
     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.Runtime.InteropServices;
     5 using System.Text;
     6 
     7 namespace Enterprise.Framework.AbstractInterface
     8 {
     9     /// <summary>
    10     /// 该抽象类定义了所有需要释放资源类型的抽象类
    11     /// </summary>
    12     public abstract class DisposableBase : IDisposable
    13     {
    14         private bool disposed = false;
    15 
    16         /// <summary>
    17         /// 实现IDisposable中的Dispose方法
    18         /// </summary>
    19         public void Dispose()
    20         {
    21             //必须为true
    22             Dispose(true);
    23             //通知垃圾回收机制不再调用终结器(析构器)
    24             GC.SuppressFinalize(this);
    25         }
    26 
    27         /// <summary>
    28         /// 不是必要的,提供一个Close方法仅仅是为了更符合其他语言(如C++)的规范
    29         /// </summary>
    30         public void Close()
    31         {
    32             Dispose();
    33         }
    34 
    35         /// <summary>
    36         /// 必须,以备程序员忘记了显式调用Dispose方法
    37         /// </summary>
    38         ~DisposableBase()
    39         {
    40             //必须为false
    41             Dispose(false);
    42         }
    43 
    44         /// <summary>
    45         /// 非密封类修饰用protected virtual
    46         /// 密封类修饰用private
    47         /// </summary>
    48         /// <param name="disposing">是否要清理托管资源,true表示需要清理托管资源,false表示不需要清理托管资源</param>
    49         protected virtual void Dispose(bool disposing)
    50         {
    51             if (disposed)
    52             {
    53                 return;
    54             }
    55             if (disposing)
    56             {
    57                 // 清理托管资源                
    58                 DisposeManagedResources();
    59             }
    60             // 清理非托管资源
    61             DisposeUnmanagedResource();
    62             //让类型知道自己已经被释放
    63             disposed = true;
    64         }
    65 
    66         /// <summary>
    67         /// 释放托管资源
    68         /// </summary>
    69         protected abstract void DisposeManagedResources();
    70 
    71         /// <summary>
    72         /// 释放非托管资源
    73         /// </summary>
    74         protected abstract void DisposeUnmanagedResource();
    75     }
    76 }
      1 using KafkaNet;
      2 using KafkaNet.Model;
      3 using KafkaNet.Protocol;
      4 using System;
      5 using System.Collections.Generic;
      6 using System.IO;
      7 using System.Linq;
      8 using System.Text;
      9 using System.Threading;
     10 using System.Threading.Tasks;
     11 using ThreeSoft.Framework.AbstractInterface;
     12 
     13 namespace Enterprise.Framework.MessageQueue
     14 {
     15     /// <summary>
     16     /// Kafka消息生产者具体的实现类,可以针对长链接进行消息发送处理,不用频繁进行消息组件的创建和销毁的工作
     17     /// </summary>
     18     public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer
     19     {
     20         #region 私有字段
     21 
     22         private KafkaNet.Producer _producer;
     23         private BrokerRouter _brokerRouter;
     24         private string _broker;
     25 
     26         #endregion
     27 
     28         #region 构造函数
     29 
     30         /// <summary>
     31         /// 通过构造函数初始化消息队列的服务器
     32         /// </summary>
     33         /// <param name="broker">消息队列服务器地址,该值不能为空</param>
     34         public KafkaMessageKeepAliveProducer(string broker)
     35         {
     36             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker))
     37             {
     38                 throw new ArgumentNullException("消息队列服务器的地址不可以为空!");
     39             }
     40 
     41             #region kafka-net实现
     42 
     43             Uri[] brokerUriList = null;
     44 
     45             if (broker.IndexOf(',') >= 0)
     46             {
     47                 string[] brokers = broker.Split(',');
     48                 brokerUriList = new Uri[brokers.Length];
     49 
     50                 for (int i = 0; i < brokers.Length; i++)
     51                 {
     52                     brokerUriList[i] = new Uri(brokers[i]);
     53                 }
     54             }
     55             else
     56             {
     57                 brokerUriList = new Uri[] { new Uri(broker) };
     58             }
     59 
     60             var kafkaOptions = new KafkaOptions(brokerUriList);
     61             _brokerRouter = new BrokerRouter(kafkaOptions);
     62             _producer = new KafkaNet.Producer(_brokerRouter);
     63 
     64             #endregion
     65 
     66             _broker = broker;
     67         }
     68 
     69         #endregion
     70 
     71         #region 实例属性
     72 
     73         /// <summary>
     74         /// 获取消息服务器的地址
     75         /// </summary>
     76         public string Broker
     77         {
     78             get { return _broker; }
     79         }
     80 
     81         #endregion
     82 
     83         #region 发送消息的方法
     84 
     85         /// <summary>
     86         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
     87         /// </summary>        
     88         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
     89         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>
     90         /// <param name="producedAction">当消息生产完成并成功发送到服务器后,可以对成功生产并发送的消息执行代理所封装方法的操作</param>
     91         public void Produce(string topic, string message, Action<MessageResult> producedAction = null)
     92         {
     93             #region 同步实现
     94 
     95             var currentDatetime = DateTime.Now;
     96             var key = currentDatetime.Second.ToString();
     97             var events = new[] { new KafkaNet.Protocol.Message(message, key) };
     98             List<ProduceResponse> result = _producer.SendMessageAsync(topic, events).Result;
     99 
    100             if (producedAction != null && result != null && result.Count > 0)
    101             {
    102                 MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[0].Offset, Partition = result[0].PartitionId, Topic = result[0].Topic };
    103                 producedAction(messageResult);
    104             }
    105 
    106             #endregion
    107         }
    108 
    109         /// <summary>
    110         /// 将指定的消息内容发送到消息服务器并存放在指定的主题名称里
    111         /// </summary>        
    112         /// <param name="topic">发送消息的主题名称,这个主题就是对消息的分类,不同的主题存放不同的消息,该参数不能为空,空值会抛出异常</param>
    113         /// <param name="message">需要发送的消息内容,该参数不能为空,空值会抛出异常</param>        
    114         public void Produce(string topic, string message)
    115         {
    116             Produce(topic, message, null);
    117         }
    118 
    119         #endregion
    120 
    121         #region 实现消息队列资源的释放
    122 
    123         /// <summary>
    124         /// 析构函数释放资源
    125         /// </summary>
    126         ~KafkaMessageKeepAliveProducer()
    127         {
    128             Dispose(false);
    129         }
    130 
    131         /// <summary>
    132         /// 释放托管资源
    133         /// </summary>
    134         protected override void DisposeManagedResources()
    135         {
    136             if (_producer != null)
    137             {
    138                 _producer.Dispose();
    139             }
    140             if (_brokerRouter != null)
    141             {
    142                 _brokerRouter.Dispose();
    143             }
    144         }
    145 
    146         /// <summary>
    147         /// 释放非托管资源
    148         /// </summary>
    149         protected override void DisposeUnmanagedResource(){}
    150 
    151         #endregion
    152     }
    153 }


        好了,今天就写到这里了,每天进步一点点,努力坚持。不忘初心,继续努力吧,欢迎大家前来讨论。

  • 相关阅读:
    什么是操作系统
    去除按钮点击的边框
    unsupported time zone specified undefined
    w3c JS测试
    视频播放器
    document.write
    HTML4到HTML5
    MPU6050
    NRF24L01模块配置
    4-Four-Seeing hands
  • 原文地址:https://www.cnblogs.com/PatrickLiu/p/9413626.html
Copyright © 2011-2022 走看看