zoukankan      html  css  js  c++  java
  • Kafka.net使用编程入门(四)

    新建一个cmd窗口,zkServer命令启动zookeeper
    打开另一个cmd窗口,输入:

    cd D:WorksoftwareApachekafka2.11inwindows
    
    kafka-server-start  D:WorksoftwareApachekafka2.11configserver.properties
    
    删除主题:E:WorkSoftWarekafka2.11inwindows>kafka-run-class.bat kafka.admin.TopicCommand --delete --topic TestSiso --zookeeper localhost:2181
    

    kafka 删除topic 提示marked for deletion
    并没有真正删除,如果要真正删除

    在每一台机器中的kafka_2.10/config/server.properties 文件加入 delete.topic.enable=true

    最后所有机器重新启动kafka

    启动kafka成功后,就可以运行项目了

    引用了kafka-net.dll

    Program.cs

     internal class Program
        {
            private static void Main(string[] args)
            {
                string header = "kafka测试";
    
                Console.Title = header;
                Console.WriteLine(header);
                ConsoleColor color = Console.ForegroundColor;
    
                var pub = new KafkaHelper("Test", true);
    
                var sub = new KafkaHelper("Test", false);
    
                Task.Run(() =>
                {
                    while (true)
                    {
                        string msg = string.Format("{0}这是一条测试消息", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
                        pub.Pub(new List<string> {msg});
    
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("发送消息:" + msg);
                        //Console.ForegroundColor = color;
                        Thread.Sleep(2000);
                    }
                });
    
                Task.Run(() => sub.Sub(msg =>
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    Console.WriteLine("收到消息:{0}", msg);
                    //Console.ForegroundColor = color;
                }));
    
                Console.ReadLine();
            }
        }
    

    KafkaHelper.cs代码:

    
        /// <summary>
        ///  kafka辅助类
        /// </summary>
        public sealed class KafkaHelper
        {
            private readonly KafkaConfig _config;
    
            private readonly ConsumerHelper _consumerHelper;
            private readonly bool _isProducer = true;
            private readonly ProduceHelper _produceHelper;
            private BrokerHelper _brokerHelper;
    
            /// <summary>
            ///     kafka辅助类构造方法
            /// </summary>
            /// <param name="sectionName">config中配置节点名称</param>
            /// <param name="isProducer"></param>
            public KafkaHelper(string sectionName, bool isProducer = true)
            {
                _isProducer = isProducer;
                _config = KafkaConfig.GetConfig(sectionName);
                _brokerHelper = new BrokerHelper(_config.Broker);
                if (isProducer)
                    _produceHelper = new ProduceHelper(_brokerHelper);
                else
                    _consumerHelper = new ConsumerHelper(_brokerHelper);
            }
    
            /// <summary>
            /// 是否是生产者模式
            /// </summary>
            public bool IsProducer
            {
                get { return _isProducer; }
            }
    
    
            /// <summary>
            ///     发送消息到队列
            /// </summary>
            /// <param name="topic"></param>
            /// <param name="datas"></param>
            /// <param name="acks"></param>
            /// <param name="timeout"></param>
            public void Pub(List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?))
            {
                _produceHelper.Pub(_config.Topic, datas, acks, timeout, MessageCodec.CodecNone);
            }
    
            /// <summary>
            ///     订阅消息
            /// </summary>
            /// <param name="onMsg"></param>
            public void Sub(Action<string> onMsg)
            {
                _consumerHelper.Sub(_config.Topic, onMsg);
            }
    
            /// <summary>
            ///     取消订阅
            /// </summary>
            public void UnSub()
            {
                _consumerHelper.UnSub();
            }
        }
    

    KafkaConfig.cs代码:

     /// <summary>
        ///  kafka配置类
        /// </summary>
        public class KafkaConfig : ConfigurationSection
        {
            /// <summary>
            ///     当前配置名称
            ///     此属性为必须
            /// </summary>
            public string SectionName { get; set; }
    
            /// <summary>
            ///     代理
            /// </summary>
            [ConfigurationProperty("broker", IsRequired = true)]
            public string Broker
            {
                get { return (string) base["broker"]; }
                set { base["broker"] = value; }
            }
    
            /// <summary>
            ///     主题
            /// </summary>
            [ConfigurationProperty("topic", IsRequired = true)]
            public string Topic
            {
                get { return (string) base["topic"]; }
                set { base["topic"] = value; }
            }
    
            #region 从配置文件中创建kafka配置类
    
            /// <summary>
            ///     获取默认kafka配置类
            /// </summary>
            /// <returns></returns>
            public static KafkaConfig GetConfig()
            {
                return (KafkaConfig) ConfigurationManager.GetSection("kafkaConfig");
            }
    
            /// <summary>
            ///     获取指定的kafka配置类
            /// </summary>
            /// <param name="sectionName"></param>
            /// <returns></returns>
            public static KafkaConfig GetConfig(string sectionName)
            {
                var section = (KafkaConfig) ConfigurationManager.GetSection(sectionName);
                //  跟默认配置相同的,可以省略
                if (section == null)
                    section = GetConfig();
                if (section == null)
                    throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
                section.SectionName = sectionName;
                return section;
            }
    
            /// <summary>
            ///     从指定位置读取配置
            /// </summary>
            /// <param name="fileName"></param>
            /// <param name="sectionName"></param>
            /// <returns></returns>
            public static KafkaConfig GetConfig(string fileName, string sectionName)
            {
                return GetConfig(ConfigurationManager.OpenMappedMachineConfiguration(new ConfigurationFileMap(fileName)),
                    sectionName);
            }
    
            /// <summary>
            ///     从指定Configuration中读取配置
            /// </summary>
            /// <param name="config"></param>
            /// <param name="sectionName"></param>
            /// <returns></returns>
            public static KafkaConfig GetConfig(Configuration config, string sectionName)
            {
                if (config == null)
                    throw new ConfigurationErrorsException("传入的配置不能为空");
                var section = (KafkaConfig) config.GetSection(sectionName);
                if (section == null)
                    throw new ConfigurationErrorsException("kafkacofng节点 " + sectionName + " 未配置.");
                section.SectionName = sectionName;
                return section;
            }
    
            #endregion
        }
    

    BrokerHelper.cs代码:

     /// <summary>
        /// 代理人辅助类
        /// </summary>
        internal class BrokerHelper
        {
            private readonly string _broker;
    
            public BrokerHelper(string broker)
            {
                _broker = broker;
            }
    
            /// <summary>
            ///     获取代理的路由对象
            /// </summary>
            /// <returns></returns>
            public BrokerRouter GetBroker()
            {
                var options = new KafkaOptions(new Uri(string.Format("http://{0}", _broker)));
                return new BrokerRouter(options);
            }
        }
    

    ConsumerHelper.cs代码:

     /// <summary>
        ///  消费者辅助类
        /// </summary>
        internal class ConsumerHelper
        {
            private readonly BrokerHelper _brokerHelper;
    
            private Consumer _consumer;
    
            private bool _unSub;
    
            public ConsumerHelper(BrokerHelper brokerHelper)
            {
                _brokerHelper = brokerHelper;
            }
    
            public void Sub(string topic, Action<string> onMsg)
            {
                _unSub = false;
    
                var opiton = new ConsumerOptions(topic, _brokerHelper.GetBroker());
    
                _consumer = new Consumer(opiton);
    
                Task.Run(() =>
                {
                    while (!_unSub)
                    {
                        IEnumerable<Message> msgs = _consumer.Consume();
                        Parallel.ForEach(msgs, msg => onMsg(Encoding.UTF8.GetString(msg.Value)));
                    }
                });
            }
    
    
            public void UnSub()
            {
                _unSub = true;
            }
        }
    

    ProduceHelper.cs代码:

    /// <summary>
        /// 生产者辅助类
        /// </summary>
        internal class ProduceHelper : IDisposable
        {
            private readonly Producer _producer;
            private BrokerHelper _brokerHelper;
    
            public ProduceHelper(BrokerHelper brokerHelper)
            {
                _brokerHelper = brokerHelper;
    
                _producer = new Producer(_brokerHelper.GetBroker());
            }
    
    
            public void Dispose()
            {
                if (_producer != null)
                    _producer.Dispose();
            }
    
            /// <summary>
            ///  发送消息到队列
            /// </summary>
            /// <param name="topic"></param>
            /// <param name="datas"></param>
            /// <param name="acks"></param>
            /// <param name="timeout"></param>
            /// <param name="codec"></param>
            public void Pub(string topic, List<string> datas, short acks = 1, TimeSpan? timeout = default(TimeSpan?), MessageCodec codec = MessageCodec.CodecNone)
            {
                //var msgs = new List<Message>();
                //foreach (string item in datas)
                //{
                //    msgs.Add(new Message(item));
                //}
                var msgs = datas.Select(item => new Message(item)).ToList();
    
                _producer.SendMessageAsync(topic, msgs, acks, timeout, codec);
            }
        }
    

    App.config

    <?xml version="1.0" encoding="utf-8"?>
    
    <configuration>
      <configSections>
        <section name="Test" type="xxxxx.sssss.KafkaConfig, xxxxx.sssss" />
      </configSections>
      <Test broker="127.0.0.1:9092" topic="TestSiso" />
      <startup>
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
      </startup>
    </configuration>
    

    运行结果如图:

    这里写图片描述

  • 相关阅读:
    26 Oracle数据库——分页
    25 PLSQL图形化操作
    24 数据库练习——简单练习
    23 SQL语言——视图 VIEW
    22 SQL语言——索引 index
    21 SQL语言——序列
    20 表结构的增删改
    19 Oracle外键约束
    18 SQL语言——约束
    17 SQL语言——子查询与关键字in
  • 原文地址:https://www.cnblogs.com/Wulex/p/6953353.html
Copyright © 2011-2022 走看看