zoukankan      html  css  js  c++  java
  • C# HttpClient 使用 Consul 发现服务

      试用了Overt.Core.Grpc, 把 GRPC 的使用改造得像 WCF, 性能测试也非常不错, 非常推荐各位使用.
      但已有项目大多是 http 请求, 改造成 GRPC 的话, 工作量比较大, 于是又找到了 Steeltoe.Discovery, 在 Startup 给 HttpClient 添加 DelegatingHandler, 动态改变请求url中的 host 和 port, 将http请求指向consul 发现的服务实例, 这样就实现了服务的动态发现.
      经过性能测试, Steeltoe.Discovery 只有 Overt.Core.Grpc 的20%, 非常难以接受, 于是自己实现了一套基于 consul 的服务发现工具. 嗯, 名字好难取啊, 暂定为 ConsulDiscovery.HttpClient 吧
      功能很简单:

    1. webapi 从json中读取配置信息 ConsulDiscoveryOptions;
    2. 如果自己是一个服务, 则将自己注册到consul中并设置健康检查Url;
    3. ConsulDiscovery.HttpClient 内有一个consul client 定时刷新所有服务的url访问地址.

      比较核心的两个类

    using Consul;
    using Microsoft.Extensions.Options;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    
    namespace ConsulDiscovery.HttpClient
    {
        public class DiscoveryClient : IDisposable
        {
            private readonly ConsulDiscoveryOptions consulDiscoveryOptions;
            private readonly Timer timer;
            private readonly ConsulClient consulClient;
            private readonly string serviceIdInConsul;
    
            public Dictionary<string, List<string>> AllServices { get; private set; } = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
    
    
            public DiscoveryClient(IOptions<ConsulDiscoveryOptions> options)
            {
                consulDiscoveryOptions = options.Value;
                consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulDiscoveryOptions.ConsulServerSetting.IP}:{consulDiscoveryOptions.ConsulServerSetting.Port}"));
                timer = new Timer(Refresh);
    
                if (consulDiscoveryOptions.ServiceRegisterSetting != null)
                {
                    serviceIdInConsul = Guid.NewGuid().ToString();
                }
            }
    
            public void Start()
            {
                var checkErrorMsg = CheckParams();
                if (checkErrorMsg != null)
                {
                    throw new ArgumentException(checkErrorMsg);
                }
                RegisterToConsul();
                timer.Change(0, consulDiscoveryOptions.ConsulServerSetting.RefreshIntervalInMilliseconds);
            }
    
            public void Stop()
            {
                Dispose();
            }
    
            private string CheckParams()
            {
                if (string.IsNullOrWhiteSpace(consulDiscoveryOptions.ConsulServerSetting.IP))
                {
                    return "Consul服务器地址 ConsulDiscoveryOptions.ConsulServerSetting.IP 不能为空";
                }
    
                if (consulDiscoveryOptions.ServiceRegisterSetting != null)
                {
                    var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
                    if (string.IsNullOrWhiteSpace(registerSetting.ServiceName))
                    {
                        return "服务名称 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceName 不能为空";
                    }
                    if (string.IsNullOrWhiteSpace(registerSetting.ServiceIP))
                    {
                        return "服务地址 ConsulDiscoveryOptions.ServiceRegisterSetting.ServiceIP 不能为空";
                    }
                }
                return null;
            }
    
            private void RegisterToConsul()
            {
                if (string.IsNullOrEmpty(serviceIdInConsul))
                {
                    return;
                }
    
                var registerSetting = consulDiscoveryOptions.ServiceRegisterSetting;
                var httpCheck = new AgentServiceCheck()
                {
                    HTTP = $"{registerSetting.ServiceScheme}{Uri.SchemeDelimiter}{registerSetting.ServiceIP}:{registerSetting.ServicePort}/{registerSetting.HealthCheckRelativeUrl.TrimStart('/')}",
                    Interval = TimeSpan.FromMilliseconds(registerSetting.HealthCheckIntervalInMilliseconds),
                    Timeout = TimeSpan.FromMilliseconds(registerSetting.HealthCheckTimeOutInMilliseconds),
                    DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(10),
                };
                var registration = new AgentServiceRegistration()
                {
                    ID = serviceIdInConsul,
                    Name = registerSetting.ServiceName,
                    Address = registerSetting.ServiceIP,
                    Port = registerSetting.ServicePort,
                    Check = httpCheck,
                    Meta = new Dictionary<string, string>() { ["scheme"] = registerSetting.ServiceScheme },
                };
                consulClient.Agent.ServiceRegister(registration).Wait();
            }
    
            private void DeregisterFromConsul()
            {
                if (string.IsNullOrEmpty(serviceIdInConsul))
                {
                    return;
                }
                try
                {
                    consulClient.Agent.ServiceDeregister(serviceIdInConsul).Wait();
                }
                catch
                { }
            }
    
            private void Refresh(object state)
            {
                Dictionary<string, AgentService>.ValueCollection serversInConsul;
                try
                {
                    serversInConsul = consulClient.Agent.Services().Result.Response.Values;
                }
                catch // (Exception ex)
                {
                    // 如果连接consul出错, 则不更新服务列表. 继续使用以前获取到的服务列表
                    // 但是如果很长时间都不能连接consul, 服务列表里的一些实例已经不可用了, 还一直提供这样旧的列表也不合理, 所以要不要在这里实现 健康检查? 这样的话, 就得把检查地址变成不能设置的
                    return;
                }
    
                // 1. 更新服务列表
                // 2. 如果这个程序提供了服务, 还要检测 服务Id 是否在服务列表里
                var tempServices = new Dictionary<string, HashSet<string>>();
                bool needReregisterToConsul = true;
                foreach (var service in serversInConsul)
                {
                    var serviceName = service.Service;
                    if (!service.Meta.TryGetValue("scheme", out var serviceScheme))
                    {
                        serviceScheme = Uri.UriSchemeHttp;
                    }
                    var serviceHost = $"{serviceScheme}{Uri.SchemeDelimiter}{service.Address}:{service.Port}";
                    if (!tempServices.TryGetValue(serviceName, out var serviceHosts))
                    {
                        serviceHosts = new HashSet<string>();
                        tempServices[serviceName] = serviceHosts;
                    }
                    serviceHosts.Add(serviceHost);
    
                    if (needReregisterToConsul && !string.IsNullOrEmpty(serviceIdInConsul) && serviceIdInConsul == service.ID)
                    {
                        needReregisterToConsul = false;
                    }
                }
    
                if (needReregisterToConsul)
                {
                    RegisterToConsul();
                }
    
                var tempAllServices = new Dictionary<string, List<string>>(StringComparer.OrdinalIgnoreCase);
                foreach (var item in tempServices)
                {
                    tempAllServices[item.Key] = item.Value.ToList();
                }
                AllServices = tempAllServices;
            }
    
    
            public void Dispose()
            {
                DeregisterFromConsul();
                consulClient.Dispose();
                timer.Dispose();
            }
        }
    }
    View Code
    using System;
    using System.Net.Http;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsulDiscovery.HttpClient
    {
        public class DiscoveryHttpMessageHandler : DelegatingHandler
        {
            private static readonly Random random = new Random((int)DateTime.Now.Ticks);
    
            private readonly DiscoveryClient discoveryClient;
    
            public DiscoveryHttpMessageHandler(DiscoveryClient discoveryClient)
            {
                this.discoveryClient = discoveryClient;
            }
    
            protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
            {
                if (discoveryClient.AllServices.TryGetValue(request.RequestUri.Host, out var serviceHosts))
                {
                    if (serviceHosts.Count > 0)
                    {
                        var index = random.Next(serviceHosts.Count);
                        request.RequestUri = new Uri(new Uri(serviceHosts[index]), request.RequestUri.PathAndQuery);
                    }
                }
                return await base.SendAsync(request, cancellationToken).ConfigureAwait(false);
            }
        }
    }
    View Code

      使用方法

      为了简单, 我为新建的WebApi 增加了一个 HelloController, 提供 SayHelloService 服务, 并把自己注册到Consul.

      当我们访问这个WebApi的 /WeatherForecast 时, 其Get()方法会访问 http://SayHelloService/Hello/NetCore, 这就相当于一次远程调用, 只是调用的就是这个WebApi的/Hello/NetCore

      1. appsettings.json 增加

      "ConsulDiscoveryOptions": {
        "ConsulServerSetting": {
          "IP": "127.0.0.1", // 必填
          "Port": 8500, // 必填
          "RefreshIntervalInMilliseconds": 1000
        },
        "ServiceRegisterSetting": {
          "ServiceName": "SayHelloService", // 必填
          "ServiceIP": "127.0.0.1", // 必填
          "ServicePort": 5000, // 必填
          "ServiceScheme": "http", // 只能是http 或者 https, 默认http, 
          "HealthCheckRelativeUrl": "/HealthCheck",
          "HealthCheckIntervalInMilliseconds": 500,
          "HealthCheckTimeOutInMilliseconds": 2000
        }
      }

      2.修改Startup.cs

    using ConsulDiscovery.HttpClient;
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using System;
    
    namespace WebApplication1
    {
        public class Startup
        {
            public Startup(IConfiguration configuration)
            {
                Configuration = configuration;
            }
    
            public IConfiguration Configuration { get; }
    
            public void ConfigureServices(IServiceCollection services)
            {
                services.AddControllers();
    
                // 注册 ConsulDiscovery 相关配置
                services.AddConsulDiscovery(Configuration);
                // 配置 SayHelloService 的HttpClient
                services.AddHttpClient("SayHelloService", c =>
                    {
                        c.BaseAddress = new Uri("http://SayHelloService");
                    })
                    .AddHttpMessageHandler<DiscoveryHttpMessageHandler>();
            }
    
            public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime)
            {
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }
    
                app.UseRouting();
    
                app.UseAuthorization();
    
                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapControllers();
                });
    
                // 启动 ConsulDiscovery
                app.StartConsulDiscovery(lifetime);
            }
        }
    }

      3. 添加 HelloController

    using Microsoft.AspNetCore.Mvc;
    
    namespace WebApplication1.Controllers
    {
        [ApiController]
        [Route("[controller]")]
        public class HelloController : ControllerBase
        {
            [HttpGet]
            [Route("{name}")]
            public string Get(string name)
            {
                return $"Hello {name}";
            }
        }
    }

      4. 修改WeatherForecast

    using Microsoft.AspNetCore.Mvc;
    using System.Net.Http;
    using System.Threading.Tasks;
    
    namespace WebApplication1.Controllers
    {
        [ApiController]
        [Route("[controller]")]
        public class WeatherForecastController : ControllerBase
        {
            private readonly IHttpClientFactory httpClientFactory;
    
            public WeatherForecastController(IHttpClientFactory httpClientFactory)
            {
                this.httpClientFactory = httpClientFactory;
            }
    
            [HttpGet]
            public async Task<string> Get()
            {
                var httpClient = httpClientFactory.CreateClient("SayHelloService");
                var result = await httpClient.GetStringAsync("Hello/NetCore");
                return $"WeatherForecast return:           {result}";
            }
        }
    }

      5. 启动consul

    consul agent -dev

      6. 启动 WebApplication1 并访问 http://localhost:5000/weatherforecast

      以上示例可以到 https://github.com/zhouandke/ConsulDiscovery.HttpClient 下载, 请记住一定要 启动consul:    consul agent -dev

      End

  • 相关阅读:
    c# 进程间同步实现
    mysql 中文支持
    堆排序算法详解
    CodeSmith 使用
    东软C#编程规范
    红伞各版key 申请和下载
    sql 添加删除字段
    第一个Hibernate 程序终于测试通过了
    C#下载大文件并实现断点续传
    Ms rdlc 打印
  • 原文地址:https://www.cnblogs.com/zhouandke/p/12957508.html
Copyright © 2011-2022 走看看