zoukankan      html  css  js  c++  java
  • asp.net core mcroservices 架构之 分布式日志(三):集成kafka

      一 kafka介绍                               

               kafka是基于zookeeper的一个分布式流平台,既然是流,那么大家都能猜到它的存储结构基本上就是线性的了。硬盘大家都知道读写非常的慢,那是因为在随机情况下,线性下,硬盘的读写非常快。kafka官方文档,一直拿传统的消息队列来和kafka对比,这样大家会触类旁通更快了解kafka的特性。最熟悉的消息队列框架有ActiveMQRabbitMQ.熟悉消息队列的,最熟悉的特性就是队列和发布订阅功能,因为这是大家最常用的,kafka实现了一些特有的机制,去规避传统的消息队列的一些瓶颈,比如并发,rabbitMQ在多个处理程序下,并不能保证执行顺序,还是必须自己去处理独占,而kafka使用consumer group的方式,实现了可以多个处理程序处理一个topic下的记录。如图:

    image

    每个分区的记录保证能被每个组接受,这样可以并发去处理一个topic的记录,而且扩展组,则可以随意根据应用需求去扩展你的应用程序,但是每个组的消费者不能超过分区的数量。

    kafka Distribution 提供了容错的功能,每一个partition都有一个服务器叫leader,还有零个或者一个以上的服务器叫follower,当这些follower都在同步数据的时候,leader扛起所有的写和读,当leader挂掉,follower会随机选取一个服务器当leader,当然必须有几个follower同步时 in-sync的。还有kafka虽然的那个记录具有原子性,但是并不支持事务。

    因为这一篇并不是专门讲解kafka,所以点到为止。

          扩展服务 开发                          

         以前讲过,netcore的一个很重要的特性就是支持依赖注入,在这里一切皆服务。那么如果需要kafka作为日志服务的终端,就首先需要kafka服务,下面咱们就开发一个kafka服务。

    首先,服务就是需要构建,这是netcore开发服务的第一步,我们首先建立一个IKafkaBuilder.cs接口类,如下:

     

    homusing Microsoft.Extensions.DependencyInjection;
    
    namespace Walt.Freamwork.Service
    {
        public interface IKafkaBuilder
        {
             /// <summary>
            /// Gets the <see cref="IServiceCollection"/> where Logging services are configured.
            /// </summary>
            IServiceCollection Services { get; }
        }
    }

    再实现它,KafkaBuilder.cs

    using Microsoft.Extensions.DependencyInjection;
    
    namespace Walt.Freamwork.Service
    {
        public class KafkaBuilder : IKafkaBuilder
        {
            public IServiceCollection Services {get;}
    
            public KafkaBuilder(IServiceCollection services)
            {
                Services=services;
            }
        }
    }

    再利用扩展方法为serviceCollection类加上扩展方法:

     using System;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.DependencyInjection.Extensions;
    using Walt.Framework.Service.Kafka;
    
    namespace Walt.Framework.Service
    {
      
      
        public static class ServiceCollectionExtensions
        {
            /// <summary>
            /// Adds logging services to the specified <see cref="IServiceCollection" />.
            /// </summary>
            /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param>
            /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
            public static IServiceCollection AddKafka(this IServiceCollection services)
            {
                return AddKafka(services, builder => { });
            }
     
            public static IServiceCollection AddKafka(this IServiceCollection services
            , Action<IKafkaBuilder> configure)
            {
                if (services == null)
                {
                    throw new ArgumentNullException(nameof(services));
                }
    
                services.AddOptions(); 
                configure(new KafkaBuilder(services));
                services.TryAddSingleton<IKafkaService,KafkaService>();  //kafka的服务类
                return services;
            }
        }
    }
    KafkaService的实现:
    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Confluent.Kafka;
    using Microsoft.Extensions.Options;
    
    namespace  Walt.Framework.Service.Kafka
    {
        public class KafkaService : IKafkaService
        {
    
            private KafkaOptions _kafkaOptions;
            private Producer _producer;
            public KafkaService(IOptionsMonitor<KafkaOptions>  kafkaOptions)
            {
                _kafkaOptions=kafkaOptions.CurrentValue; 
                kafkaOptions.OnChange((kafkaOpt,s)=>{
                    _kafkaOptions=kafkaOpt; 
                        System.Diagnostics.Debug
                        .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
                        
                });
                 _producer=new Producer(_kafkaOptions.Properties);
            }
    
            private byte[] ConvertToByte(string str)
            {
                return System.Text.Encoding.Default.GetBytes(str);
            }
     
            public  async Task<Message> Producer(string topic,string key,string value)
            {  
                if(string.IsNullOrEmpty(topic)
                ||string.IsNullOrEmpty(value))
                {
                    throw new ArgumentNullException("topic或者value不能为null.");
                }
          
               var task=  await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value)); 
               return task;
            }
     
        }
    }

    那么咱们是不是忘记什么了,看上面的代码,是不是那个配置类KafkaOptions 还没有说明

    image这个位置添加kafka的配置类KafkaConfigurationOptions

    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Options;
    using Walt.Freamwork.Service;
    
    namespace Walt.Freamwork.Configuration
    {
        public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions>
        {
    
            private readonly IConfiguration _configuration;
    
    
            public KafkaConfigurationOptions(IConfiguration configuration)
            {
               _configuration=configuration;
            }
    
    
            public void Configure(KafkaOptions options)
            {
                    //这里仅仅自定义一些你自己的代码,使用上面configuration配置中的配置节,处理程序没法自动绑定的
                      一些事情。
            }
        }
    }

    然后,将配置类添加进服务:

    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.DependencyInjection.Extensions;
    using Microsoft.Extensions.Options;
    using Walt.Framework.Service;
    
    namespace Walt.Framework.Configuration
    {
        public static class KafkaConfigurationExtensioncs
        {
              public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder
              ,IConfiguration configuration)
              {
                   
                    InitService( builder,configuration); 
                    return builder;
              }
    
    
              public static void InitService(IKafkaBuilder builder,IConfiguration configuration)
              {
                builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>(
                      new KafkaConfigurationOptions(configuration));  //配置类和配置内容
    
                builder.Services.TryAddSingleton
                (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>(
                      new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );//这个是观察类,如果更改,会激发onchange方法
    
                builder.Services
                .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>>
                (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); //这个是option类,没这个,配置无法将类绑定
                
                 builder.Services.AddSingleton(new KafkaConfiguration(configuration));
              }
        }
    } 

    ok,推送nuget,业务部分调用。

          kafka服务调用                          

    在project中引用然后restore:

    image

    引入命名空间:

    image

    调用:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Threading.Tasks;
    using Microsoft.AspNetCore;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Logging; 
    using Newtonsoft.Json;
    using Walt.Framework.Log;
    using Walt.Framework.Configuration;
    using Walt.Framework.Service;
    
    namespace Walt.TestMcroServoces.Webapi
    {
        public class Program
        { 
            public static void Main(string[] args)
            { 
                 
                var host = new WebHostBuilder()
                .ConfigureAppConfiguration((hostingContext, configContext) =>{
                     var en=hostingContext.HostingEnvironment;
                     if(en.IsDevelopment())
                     {
                         configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json");
                     }
                     else
                     {
                         configContext.AddJsonFile("appsettings.json");
                     }
                       configContext.AddCommandLine(args)
                 .AddEnvironmentVariables()
                 .SetBasePath(Directory.GetCurrentDirectory()).Build(); 
                  
                }).ConfigureServices((context,configureServices)=>{
                       configureServices.AddKafka(KafkaBuilder=>{
                        KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService"));
                       });
                })   //kafka的调用。
                .ConfigureLogging((hostingContext, logging) => {
     
                    logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"))
                    .AddCustomizationLogger();
    
                }).UseKestrel(KestrelServerOption=>{
                    KestrelServerOption.ListenAnyIP(801);
                })
                .UseStartup<Startup>().Build(); 
                host.Run(); 
                Console.ReadKey();
            }
        }
    
    }

    然后提交git,让jenkins构建docker发布运行:

    jenkin是是非常牛的一款构建工具,不仅仅根据插件可以扩展不同环境,还支持分布式构建.

     

    image

    这是我们用jenikins构建的的:

    image

    让它跑起来:

    image

    调用看看:

    image

    这个方法是输出Properties数组的,这个配置结构只是演示,后面的结构要变,因为要放kafka的配置,比如连接服务ip等,

    改动也很简单,在配置好configuration和service后,改动这个类KafkaOptions和配置文件中kafka节点中的json结构就行。

    image

      四 集成kafka                         

    kafka的接口不多,看看都有那些:

    https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html

    image

    ConsumerProducer是咱们发布消息和消费消息的两个主类,代码在上文已经实现的service

    客户端代码:

    使用my-replicated-topic-morepart这儿topic,还是希望多分区,因为后面consumer使用分布式计算读取。

    image

    consumer先在客户端监听:

    image

    product端的调用代码:

    image

    执行这个接口后,再看consumer接收到的消息:

    image

    最后一步,将咱们kafka日志部分替换为真实的kafka环境,看结果:

    image

    那么最后的配置是这样的:

    {
      "Logging": {
        "LogLevel": {
          "Default": "Debug",
          "System": "Debug",
          "Microsoft": "Debug"
        },
        "KafkaLog":{
          "Prix":"这是我的自定义日志提供程序"
        }
      },
      "KafkaService":{
        "Properties":{
          "bootstrap.servers":"192.168.249.106:9092"
        }
      }
    }

    log使用这个kafka服务就很简单了,在前面文章中实现的log扩展类中,直接构造函数注入这个kafkaService,就可以以使用了。

  • 相关阅读:
    typeof返回的结果必定是字符串
    coe文件格式
    求余算法的FPGA实现
    dBm
    信噪比
    增益
    总谐波失真THD
    基波与谐波
    Tco时候在干嘛?
    AXI4-Slave自定义IP设计
  • 原文地址:https://www.cnblogs.com/ck0074451665/p/10211725.html
Copyright © 2011-2022 走看看