zoukankan      html  css  js  c++  java
  • asp.net core 和consul

    consul集群搭建

    Consul是HashiCorp公司推出的使用go语言开发的开源工具,用于实现分布式系统的服务发现与配置,内置了服务注册与发现框架、分布一致性协议实现、健康检查、Key/Value存储、多数据中心方案,使用起来较为简单。使用docker命令创建注册中心比较麻烦,并且不好维护,这里使用docker-compose来实现。registrator保证了,如果服务已停止,则从注册中心中移除。docker-compose.yaml如下

    version: "3.0"
     
    services:
        # consul server,对外暴露的ui接口为8500,只有在2台consul服务器的情况下集群才起作用
        consulserver:
            image: progrium/consul:latest
            hostname: consulserver
            ports:
                - "8300"
                - "8400"
                - "8500:8500"
                - "53"
            command: -server -ui-dir /ui -data-dir /tmp/consul --bootstrap-expect=3
     
        # consul server1在consul server服务起来后,加入集群中
        consulserver1:
            image: progrium/consul:latest
            hostname: consulserver1
            depends_on:
                - "consulserver"
            ports:
                - "8300"
                - "8400"
                - "8500"
                - "53"
            command: -server -data-dir /tmp/consul -join consulserver
     
        # consul server2在consul server服务起来后,加入集群中
        consulserver2:
            image: progrium/consul:latest
            hostname: consulserver2
            depends_on:
                - "consulserver"
            ports:
                - "8300"
                - "8400"
                - "8500"
                - "53"
            command: -server -data-dir /tmp/consul -join consulserver
        registrator:
            image: gliderlabs/registrator:master
            hostname: registrator
            depends_on:
                - "consulserver"
            volumes:
                - "/var/run/docker.sock:/tmp/docker.sock"
            command: -internal consul://consulserver:8500

    然后运行docker-compose up -d 

    ASP.NET

    注册服务

    创建一个ServiceA(asp.net core 2.2) 项目,需要安装Consul,Consul包中提供了一个IConsulClient类,我们可以通过它来调用Consul进行服务的注册,以及发现等。我们需要在服务启动的时候,将自身的地址等信息注册到Consul中,并在服务关闭的时候从Consul撤销。这种行为就非常适合使用 IHostedService 来实现。这里要注意的是,我们需要保证_serviceId对于同一个实例的唯一,避免重复性的注册。关闭时撤销服务:ConsulHostedService.cs

    namespace ServiceA
    {
        using Consul;
        using Microsoft.AspNetCore.Hosting.Server;
        using Microsoft.AspNetCore.Hosting.Server.Features;
        using Microsoft.Extensions.Hosting;
        using Microsoft.Extensions.Logging;
        using System;
        using System.Linq;
        using System.Net;
        using System.Threading;
        using System.Threading.Tasks;
     
        public class ConsulHostedService : IHostedService
        {
            private readonly IConsulClient _consulClient;
            private readonly ILogger _logger;
            private readonly IServer _server;
     
            public ConsulHostedService(IConsulClient consulClient, ILogger<ConsulHostedService> logger, IServer server)
            {
                _consulClient = consulClient;
                _logger = logger;
                _server = server;
            }
     
            private CancellationTokenSource _cts;
            private string _serviceId;
     
            public async Task StartAsync(CancellationToken cancellationToken)
            {
                // Create a linked token so we can trigger cancellation outside of this token's cancellation
                _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
     
                var features = _server.Features;
                var address = features.Get<IServerAddressesFeature>().Addresses.First();
                var uri = new Uri(address);
     
                _serviceId = "Service-v1-" + Dns.GetHostName() + "-" + uri.Authority;
     
                var registration = new AgentServiceRegistration()
                {
                    ID = _serviceId,
                    Name = "Service",
                    Address = uri.Host,
                    Port = uri.Port,
                    Tags = new[] { "api" },
                    Check = new AgentServiceCheck()
                    {
                        // HTTP = $"{uri.Scheme}://{uri.Host}:{uri.Port}/api/Health/Status",
                        HTTP = $"{uri.Scheme}://{uri.Host}:{uri.Port}/healthz",
                        Timeout = TimeSpan.FromSeconds(2),
                        Interval = TimeSpan.FromSeconds(10)
                    }
                };
     
                _logger.LogInformation("Registering in Consul");
     
                // 首先移除服务,避免重复注册
                await _consulClient.Agent.ServiceDeregister(registration.ID, _cts.Token);
                await _consulClient.Agent.ServiceRegister(registration, _cts.Token);
            }
     
            public async Task StopAsync(CancellationToken cancellationToken)
            {
                _cts.Cancel();
                _logger.LogInformation("Deregistering from Consul");
                try
                {
                    await _consulClient.Agent.ServiceDeregister(_serviceId, cancellationToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, $"Deregisteration failed");
                }
            }
        }
    }

    StartupConfigureServices方法中来配置IConsulClient到ASP.NET Core的依赖注入系统中,healthz地址,我使用了ASP.NET Core 2.2中自带的健康检查,它需要在Startup中添加如下配置

    namespace ServiceA
    {
        using System;
        using Consul;
        using Microsoft.AspNetCore.Builder;
        using Microsoft.AspNetCore.Mvc;
        using Microsoft.Extensions.Configuration;
        using Microsoft.Extensions.DependencyInjection;
        using Microsoft.Extensions.Hosting;
        public class Startup
        {
            public Startup(IConfiguration configuration)
            {
                Configuration = configuration;
            }
     
            public IConfiguration Configuration { get; }
            public void ConfigureServices(IServiceCollection services)
            {
                //配置IConsulClient到ASP.NET Core的依赖注入系统中
                string consulAddress = "http://192.168.100.5:8500";
                services.AddSingleton<IConsulClient, ConsulClient>(p => new ConsulClient(consulConfig =>
                {
                    consulConfig.Address = new Uri(consulAddress);
                }));
                services.AddSingleton<IHostedService, ConsulHostedService>();
     
                services.AddHealthChecks();//自带的健康检查
     
                services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
            }
            public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }
               app.UseHealthChecks("/healthz");
                app.UseMvc();
            }
        }
    }

    当然也可以自己写一个HealthController:

    using Microsoft.AspNetCore.Mvc;
     
    namespace ServiceA.Controllers
    {
        [Route("api/[controller]")]
        [Produces("application/json")]
        [ApiController]
        public class HealthController : Controller
        {
            [HttpGet("status")]
            public IActionResult Status() => Ok();
        }
    }
    using Microsoft.AspNetCore.Mvc;
     
    namespace ServiceA.Controllers
    {
        [Route("api/[controller]")]
        [ApiController]
        public class ValuesController : ControllerBase
        {
            // GET api/values
            [HttpGet]
            public ActionResult<string> Get()
            {
                return "value1AAA";
            }
        }
    }

    可以在Program.cs指定端口:

    namespace ServiceA
    {
        using Microsoft.AspNetCore;
        using Microsoft.AspNetCore.Hosting;
        public class Program
        {
            public static void Main(string[] args)
            {
                CreateWebHostBuilder(args).Build().Run();
            }
     
            public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
                WebHost.CreateDefaultBuilder(args).UseUrls("http://192.168.100.2:6002")
                    .UseStartup<Startup>();
        }
    }

    这里简要说明一下我的环境, 代码在win10物理机上,consul集群是win10虚拟机上ubuntu18的docker 环境,所以指定ip便于docker里面访问,还有就是win10的防火墙要关闭。

    把新建ServiceB和ServiceA一样 只是修改一个端口然后用 dotnet run 运行如下:

    把ServiceB关闭后


    发现服务

    现在来看看服务消费者如何从Consul来获取可用的服务列表。

    我们创建一个ConsoleApp,做为服务的调用端,添加ConsulNuget包,然后,我们创建一个ConsulClient实例,直接调用consuleClient.Health.Service就可以获取到可用的服务列表了,然后使用HttpClient就可以发起对服务的调用。

    但我们需要思考一个问题,我们什么时候从Consul获取服务呢?最为简单的便是在每次调用服务时,都先从Consul来获取一下服务列表,这样做的好处是我们得到的服务列表是最新的,能及时获取到新注册的服务以及过滤掉挂掉的服务。但是这样每次请求都增加了一次对Consul的调用,对性能有稍微的损耗,不过我们可以在每个调用端的机器上都部署一个Consul Agent,这样对性能的影响就微乎其微了。另外一种方式,可以在调用端做服务列表的本地缓存,并定时与Consul同步。其实现也非常简单,通过一个Timer来定时从Consul拉取最新的服务列表,创建一个ConsulServiceProvider.cs类,实现如下:

    namespace ConsoleApp
    {
        using System;
        using System.Collections.Generic;
        using System.Threading;
        using System.Threading.Tasks;
        using Consul;
        public interface IServiceDiscoveryProvider
        {
            Task<List<string>> GetServicesAsync();
        }
        public class ConsulServiceProvider : IServiceDiscoveryProvider
        {
            private string consulAddres;
            public ConsulServiceProvider(string url) {
                consulAddres = url;
            }
            public async Task<List<string>> GetServicesAsync()
            {
                var consuleClient = new ConsulClient(consulConfig =>
                {
                    consulConfig.Address = new Uri(consulAddres);
                });
     
                var queryResult = await consuleClient.Health.Service("Service", string.Empty, true);
     
                while (queryResult.Response.Length == 0)
                {
                    Console.WriteLine("No services found, wait 1s....");
                    await Task.Delay(1000);
                    queryResult = await consuleClient.Health.Service("Service", string.Empty, true);
                }
     
                var result = new List<string>();
                foreach (var serviceEntry in queryResult.Response)
                {
                    result.Add(serviceEntry.Service.Address + ":" + serviceEntry.Service.Port);
                }
                return result;
            }
        }
     
        public class PollingConsulServiceProvider : IServiceDiscoveryProvider
        {
            private List<string> _services = new List<string>();
            private bool _polling;
            private string consulAddres;
            public PollingConsulServiceProvider(string url)
            {
                consulAddres = url;
                var _timer = new Timer(async _ =>
                {
                    if (_polling)
                    {
                        return;
                    }
     
                    _polling = true;
                    await Poll();
                    _polling = false;
     
                }, null, 0, 1000);
            }
         
            public async Task<List<string>> GetServicesAsync()
            {
                if (_services.Count == 0) await Poll();
                return _services;
            }
     
            private async Task Poll()
            {
                _services = await new ConsulServiceProvider(consulAddres).GetServicesAsync();
            }
        }
    }

    负载均衡

    如何将不同的用户的流量分发到不同的服务器上面呢,早期的方法是使用DNS做负载,通过给客户端解析不同的IP地址,让客户端的流量直接到达各个服务器。但是这种方法有一个很大的缺点就是延时性问题,在做出调度策略改变以后,由于DNS各级节点的缓存并不会及时的在客户端生效,而且DNS负载的调度策略比较简单,无法满足业务需求,因此就出现了负载均衡器。

    常见的负载均衡算法有如下几种:

    • 随机算法:每次从服务列表中随机选取一个服务器。

    • 轮询及加权轮询:按顺序依次调用服务列表中的服务器,也可以指定一个加权值,来增加某个服务器的调用次数。

    • 最小连接:记录每个服务器的连接数,每次选取连接数最少的服务器。

    • 哈希算法:分为普通哈希与一致性哈希等。

    • IP地址散列:通过调用端Ip地址的散列,将来自同一调用端的分组统一转发到相同服务器的算法。

    • URL散列:通过管理调用端请求URL信息的散列,将发送至相同URL的请求转发至同一服务器的算法。

    本文中简单模拟前两种来介绍一下。

    随机均衡是最为简单粗暴的方式,我们只需根据服务器数量生成一个随机数即可

    最简单的轮询实现 使用lock控制并发,每次请求,移动一下服务索引。

    RandomLoadBalancer.cs

    namespace ConsoleApp
    {
        using System;
        using System.Threading.Tasks;
        public interface ILoadBalancer
        {
            Task<string> GetServiceAsync();
        }
        public class RandomLoadBalancer : ILoadBalancer
        {
            private readonly IServiceDiscoveryProvider _sdProvider;
     
            public RandomLoadBalancer(IServiceDiscoveryProvider sdProvider)
            {
                _sdProvider = sdProvider;
            }
     
            private Random _random = new Random();
     
            public async Task<string> GetServiceAsync()
            {
                var services = await _sdProvider.GetServicesAsync();
                return services[_random.Next(services.Count)];
            }
        }
        public class RoundRobinLoadBalancer : ILoadBalancer
        {
            private readonly IServiceDiscoveryProvider _sdProvider;
     
            public RoundRobinLoadBalancer(IServiceDiscoveryProvider sdProvider)
            {
                _sdProvider = sdProvider;
            }
     
            private readonly object _lock = new object();
            private int _index = 0;
     
            public async Task<string> GetServiceAsync()
            {
                var services = await _sdProvider.GetServicesAsync();
                lock (_lock)
                {
                    if (_index >= services.Count)
                    {
                        _index = 0;
                    }
                    return services[_index++];
                }
            }
        }
    }

    便可以直接使用HttpClient来完成服务的调用了

     
    namespace ConsoleApp
    {
        using System;
        using System.Net.Http;
        using System.Threading.Tasks;
        class Program
        {
            static void  Main(string[] args)
            {
                TestConsul().ConfigureAwait(false);
                Console.ReadKey();
            }
            static async Task TestConsul() {
                string url = "http://192.168.100.5:8500";
                ILoadBalancer balancer = new RoundRobinLoadBalancer(new PollingConsulServiceProvider(url));
                var client = new HttpClient();
     
                Console.WriteLine("Request by RoundRobinLoadBalancer....");
                for (int i = 0; i < 10; i++)
                {
                    var service = await balancer.GetServiceAsync();
     
                    Console.WriteLine(DateTime.Now.ToString() + "-RoundRobin:" +
                        await client.GetStringAsync("http://" + service + "/api/values") + " --> " + "Request from " + service);
                }
     
                Console.WriteLine("Request by RandomLoadBalancer....");
                balancer = new RandomLoadBalancer(new PollingConsulServiceProvider(url));
                for (int i = 0; i < 10; i++)
                {
                    var service = await balancer.GetServiceAsync();
     
                    Console.WriteLine(DateTime.Now.ToString() + "-Random:" +
                        await client.GetStringAsync("http://" + service + "/api/values") + " --> " + "Request from " + service);
                }
            }
        }
    }


    代码下载

    参考:

    consul+docker实现服务注册

    RainingNight/AspNetCoreSample

    微服务(入门二):netcore通过consul注册服务

  • 相关阅读:
    学习笔记
    聊聊字节序
    SPDK发送和接收连接请求的处理
    企业设备维护——不仅仅是解决问题
    怎样快速找到某一行代码的git提交记录
    生产环境中利用软链接避免"rm -rf /"的方法
    程序员五年小结
    Django Model 数据库增删改查
    python中字符串列表字典常用方法
    python编辑配置
  • 原文地址:https://www.cnblogs.com/majiang/p/11379877.html
Copyright © 2011-2022 走看看