zoukankan      html  css  js  c++  java
  • Kafka基础教程(四):.net core集成使用Kafka消息队列

      .net core使用Kafka可以像上一篇介绍的封装那样使用(Kafka基础教程(三):C#使用Kafka消息队列),但是我还是觉得再做一层封装比较好,同时还能使用它做一个日志收集的功能。

      因为代码比较多,所有就直接放到码云(Gitee)上去了,地址:https://gitee.com/shanfeng1000/dotnetcore-demo/tree/master/Kafka(为什么不是github,因为github太慢了-_-!!)

      感兴趣的可以克隆下来再按照自己的需求修改,这里简单介绍一下使用的Demo(Demo基于.net core3.1的版本,其他版本可能需要自行测试)

      生产者(AspNetCore.WebApi.Producer)

      首选需要在ConfigureServices中添加相关依赖项:  

        public void ConfigureServices(IServiceCollection services)
        {
            var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };
    
            #region 日志记录
    
            services.AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
            });
            services.AddKafkaLogger(options =>
            {
                options.BootstrapServers = hosts;
                options.Category = "Home";
                options.InitializeCount = 10;
                options.Key = "log";
                options.MinLevel = LogLevel.Trace;
                options.Topic = "topic.logger";
                options.ApplicationName = "AspNetCore.WebApi.Producer";
            });
    
            #endregion
    
            #region Kafka
    
            services.AddKafkaProducer(options =>
            {
                options.BootstrapServers = hosts;
                options.InitializeCount = 3;
                options.Key = "kafka";
                options.Topic = "topic.kafka";
            });
    
            #endregion
    
            ......
        }

      AddKafkaLogger是添加日志的相关依赖服务配置,之后使用.net core的ILogger对象记录消息时就可以直接将消息发布到Kafka了。

      AddKafkaProducer是添加Kafka发布者的相关配置,可以指定一个名称,使用时使用IKafkaProducerFactory接口注入即可,比如在Home控制器中使用:  

        [ApiController]
        [Route("[controller]")]
        public class HomeController : ControllerBase
        {
            IKafkaProducerFactory kafkaProducerFactory;
            ILoggerFactory loggerFactory;
    
            public HomeController(IKafkaProducerFactory kafkaProducerFactory, ILoggerFactory loggerFactory)
            {
                this.kafkaProducerFactory = kafkaProducerFactory;
                this.loggerFactory = loggerFactory;
            }
    
            /// <summary>
            /// 发布消息
            /// </summary>
            /// <param name="message">消息</param>
            /// <returns>success</returns>
            [HttpGet("Kafka")]
            public string Kafka(string message)
            {
                message = message ?? "";
                var producer = kafkaProducerFactory.Create();
                producer.Publish(message);
    
                return "success";
            }
            /// <summary>
            /// 日志
            /// </summary>
            /// <param name="message">消息</param>
            /// <returns>success</returns>
            [HttpGet("Logger")]
            public string Logger(string message)
            {
                var logger1 = loggerFactory.CreateLogger("logger");
                logger1.LogTrace($"logger1(LogTrace):{message}");
                logger1.LogDebug($"logger1(LogDebug):{message}");
                logger1.LogInformation($"logger1(LogInformation):{message}");
                logger1.LogWarning($"logger1(LogWarning):{message}");
                logger1.LogError($"logger1(LogError):{message}");
                logger1.LogCritical($"logger1(LogCritical):{message}");
    
                var logger2 = loggerFactory.CreateLogger("123456");
                logger2.LogTrace($"logger2(LogTrace):{message}");
                logger2.LogDebug($"logger2(LogDebug):{message}");
                logger2.LogInformation($"logger2(LogInformation):{message}");
                logger2.LogWarning($"logger2(LogWarning):{message}");
                logger2.LogError($"logger2(LogError):{message}");
                logger2.LogCritical($"logger2(LogCritical):{message}");
    
                return "success";
            }
        }

      消费者(AspNetCore.WebApi.Consumer)

       首选需要在ConfigureServices中添加相关依赖项: 

        public void ConfigureServices(IServiceCollection services)
        {
            var hosts = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };
    
            #region 日志记录
    
            services.AddKafkaConsumer(options =>
            {
                options.BootstrapServers = hosts;
                options.EnableAutoCommit = true;//自动提交
                options.GroupId = "group.1";
                options.Subscribers = KafkaSubscriber.From("topic.logger");
    
            }).AddListener(result =>
            {
                Console.WriteLine("Message From topic.logger:" + result.Message);
            });
    
            #endregion
    
            #region Kafka
    
            services.AddKafkaConsumer(options =>
            {
                options.BootstrapServers = hosts;
                options.EnableAutoCommit = false;
                options.GroupId = "group.2";
                options.Subscribers = KafkaSubscriber.From("topic.kafka");
    
            }).AddListener(result =>//直接在lambda表达式中完成消费逻辑
            {
                Console.WriteLine("Message From topic.kafka:" + result.Message);
                result.Commit();
            }).AddListener<KafkaConsumerListener>();//实现IKafkaConsumerListener接口完成消费逻辑
    
            #endregion
    
            ......
        }

      无论是日志的消息消费还是自定义的消息消费,都是先使用AddKafkaConsumer方法声明Kafka消费者的配置,然后使用AddListener方法添加消息消费的处理程序,AddListener有几个委托,可以接受一个lambda表达式,可以使用一个实现了IKafkaConsumerListener接口的类,就比如上面的KafkaConsumerListener类:  

        public class KafkaConsumerListener : IKafkaConsumerListener
        {
            public Task ConsumeAsync(RecieveResult recieveResult)
            {
                Console.WriteLine("KafkaConsumerListener:" + recieveResult.Message);
                recieveResult.Commit();
                return Task.CompletedTask;
            }
        }
  • 相关阅读:
    7.$a = 'abcdef'; 请取出$a的值并打印出第一个字母
    8.PHP可以和sql server/oracle等数据库连接吗?
    6.能够使HTML和PHP分离开使用的模板
    4.用PHP打印出前一天的时间格式是2006-5-10 22:21:21
    5.echo(),print(),print_r()的区别
    3.数据库中的事务是什么?
    spring中配置quartz调用两次及项目日志log4j不能每天生成日志解决方法
    tomcat7性能调优与配置(以windows版为例)
    eclipse中maven下载不了私服上面的第三方包问题
    birt4.6部署到tomcat及启动服务报错解决方法
  • 原文地址:https://www.cnblogs.com/shanfeng1000/p/13035726.html
Copyright © 2011-2022 走看看