zoukankan      html  css  js  c++  java
  • 记录core中GRPC长连接导致负载均衡不均衡问题 二,解决长连接问题

    题外话:

    1.这几天收到蔚来的面试邀请,但是自己没做准备,并且远程面试,还在上班时间,再加上老东家对我还不错.没想着换工作,导致在自己工位上做算法题不想被人看见,然后非常紧张.估计over了.不过没事,接下来知道哪里不足补哪里继续我的grpc源码解析

    2.上期的博客,记录了grpc源码及创建grpc的过程,其实说到底就是围绕GrpcChannel,通过httpclient做长连接处理这次来分析下,具体的实现规律

    3.直接上github地址:https://github.com/BestHYC/GRPCHelper

    4.大家还是得多做题,不然面试都过不去,都不会看你代码。

    一:查看创建HttpClient的源码

            public HttpClient CreateClient(string name)
            {
                if (name == null)
                {
                    throw new ArgumentNullException(nameof(name));
                }
     
                HttpMessageHandler handler =  _activeHandlers.GetOrAdd(name, _entryFactory).Value;
                var client = new HttpClient(handler, disposeHandler: false);
     
                HttpClientFactoryOptions options = _optionsMonitor.Get(name);
                for (int i = 0; i < options.HttpClientActions.Count; i++)
                {
                    options.HttpClientActions[i](client);
                }
     
                return client;
            }
    View Code

    可以看见,在处理HttpName的时候,通过GetOrAdd来提供HttpClient,那么可以得到一个事实就是,其实AddHttpClient(name),只不过用来标识,其实底层没做特别处理,即使我仅仅AddHttpClient(),在创建HttpClient的时候使用CreateClient("AA"),另外一个地方同样使用CreateClient("AA"),这两个HttpClient在未dispose情况下,还是会共用一个句柄

    二:查看Grpc中HttpClinet使用场景

    也是查看DefaultGrpcClientFactory创建中var httpClient = _httpClientFactory.CreateClient(name);而由前面源码可知,name可以当成同一个Grpc客户端名称。那么得到,
    同一个GrpcClient共用同一个HttpClient,不同的客户端还是会产生2个链接,我们来抓包测试下

                StringBuilder sb = new StringBuilder();
                for (Int32 i = 0; i < 2; i++)
                {
                    var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                    sb.Append(result.Message);
                    sb.Append("client1的执行结果");
                    var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                    sb.AppendLine(result1.Message);
                }
                return sb.ToString();

    如果按照正常逻辑是公用同一个端口号。但是查看可以发现,client两次复用一个端口,Client1两次也是复用一个端口,但是这两个客户端不公用同一个端口号
    可以证明我们结合上面代码的逻辑是正确的。
    在反证明一次,如果共用一个HttpClient那么端口号相同。那么采用原始创建GrpcChannel方式

            [HttpGet("DoubleSamePortByChannel")]
            public String DoubleSamePortByChannel()
            {
                StringBuilder sb = new StringBuilder();
                var channel = GrpcChannel.ForAddress("");
                for (Int32 i = 0; i < 100; i++)
                {
                    var client = new Greeter.GreeterClient(channel);
                    var result = client.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                    sb.Append(result.Message);
                    sb.Append("client1的执行结果");
                    var client1 = new Greeter1.Greeter1Client(channel);
                    var result1 = client1.SayHelloAsync(new HelloRequest() { Name = i.ToString() }).ResponseAsync.Result;
                    sb.AppendLine(result1.Message);
                }
                return sb.ToString();
            }

    因为共用一个Channel,所以HttpClient是公用的。抓包可以看到,他们复用同一个端口号。
    结论:如果共用同一个HttpClient,那么复用同一个端口号,如果使用不同的HttpClient,那么即使是基于Http2.0也是不同的端口号

    三:改动源码,解决长连接问题

    改动前需要确定几个目的:
    1.避免每次AddGrpcClient()注入,随时注入随时启用
    2.每次客户端能够复用连接,那么就复用。
    3.当请求量比较大的时候,每个端口最多保证10次调用,然后启用新的HttpClient,使用新的http端口号
    4.保证可以调用多个站点集合,但是由于正常情况下,大部分站点都是相同的,这里就不做拓展,拓展开来其实都一致。

    3.1.解决GrpcClient的注入问题。

    难点:1.注入当前站点。2.解决创建Client的注入问题。

    3.1.1:注入当前站点,采用最原始的方式,直接GrpcClientFactoryOptions的CurrentValue,而不是通过Option的Get获取通过名称的配置,
    缺点是共同使用而不是单点配置,当然完全可以改,这里就不做拓展了。

              public static IHttpClientBuilder AddMyGrpcClient<TClient>(this IServiceCollection services, String url)
                where TClient : class
            {
                if (services == null)
                {
                    throw new ArgumentNullException(nameof(services));
                }
                services.Configure<GrpcClientFactoryOptions>(options => options.Address = new Uri(url));
                var name = TypeNameHelper.GetTypeDisplayName(typeof(TClient), fullName: false);
                return services.AddGrpcClientCore<TClient>(name);
            }

    3.1.2:注入当前GrpcClient,由于看源码得到,所有的Client都是通过DefaultClientActivator<T>创建,修改代码

    private void AddClient(Type type)
            {
                if (type == null) return;
                Func<ObjectFactory> result = () => ActivatorUtilities.CreateFactory(type, new Type[] { typeof(CallInvoker), });
                if (_createActivator.ContainsKey(type))
                {
                    _createActivator[type] = result;
                }
                else
                {
                    _createActivator.Add(type, result);
                }
            }
            private Object m_lock = new Object();
            public TClient CreateClient<TClient>(CallInvoker callInvoker)
            {
                if (!_createActivator.ContainsKey(typeof(TClient)))
                {
                    lock (m_lock)
                    {
                        if (!_createActivator.ContainsKey(typeof(TClient)))
                        {
                            AddClient(typeof(TClient));
                        }
                    }
                }
                return (TClient)Activator(typeof(TClient))(_services, new object[] { callInvoker });
            }

    增加AddClient,这样在创建新的客户端时候去判断是否存在,不存在就新增,而不是原来只新增注入的Client。
    3.1.3.限制请求数量,这种通过注入的方式,留给大家自己扩展吧,因为我发现一个基于GRPCChanel的原始版本

    4.不修改注入方式,而是采用直连方式

    4.1.创建Client,基于表达式实现

    private static Dictionary<String, Func<GrpcChannel, Object>> m_expression = new Dictionary<String, Func<GrpcChannel, Object>>();
            private T GetFunc<T>(GrpcChannel channel)
            {
                String name = typeof(T).FullName;
                if (m_expression.ContainsKey(name)) return (T)m_expression[name].Invoke(channel);
                var argumentType = new[] { typeof(GrpcChannel) };
                var constructor = typeof(T).GetConstructor(
                    BindingFlags.Instance | BindingFlags.Public,
                    null,
                    argumentType,
                    null);
                var param = Expression.Parameter(typeof(GrpcChannel), "channel");
                var constructorCallExpression = Expression.New(constructor, param);
                var constructorCallingLambda = Expression
                  .Lambda<Func<GrpcChannel, Object>>(constructorCallExpression, param).Compile();
                m_expression.Add(name, constructorCallingLambda);
                return (T)constructorCallingLambda(channel);
            }

    4.2.创建GrpcChannel的代码实现,并且每次请求只允许10次

         public T GetHttpClient<T>()
            {
                lock (m_lock)
                {
                    HttpClient client = null;
                    foreach (var item in m_httpclients)
                    {
                        if (item.Value < 10)
                        {
                            m_currentname = item.Key;
                            break;
                        }
                    }
                    if (String.IsNullOrWhiteSpace(m_currentname))
                    {
                        String guid = Guid.NewGuid().ToString();
                        m_currentname = guid;
                        m_httpclients.Add(guid, 0);
                    }
                    m_httpclients[m_currentname] += 1;
                    client = m_httpclientfactory.CreateClient(m_currentname);
                    GrpcChannelOptions options = new GrpcChannelOptions()
                    {
                        HttpClient = client
                    };
                    var channel = GrpcChannel.ForAddress("http://localhost:6001", options);
                    var client1 = GetFunc<T>(channel);
                    return client1;
                }
            }
            public void Dispose()
            {
                lock (m_lock)
                {
                    if (m_currentname == null) return;
                    if (m_httpclients.TryGetValue(m_currentname, out Int32 num))
                    {
                        if (num <= 0) return;
                        m_httpclients[m_currentname] = num - 1;
                    }
                }
            }
    View Code

    五:测试是否成功

    [HttpGet("GrpcHelper")]
    public String GetInfotest([FromServices] GrpcHelper grpcHelper)
    {
    StringBuilder sb = new StringBuilder();
    Int32 a = 0;
    for (Int32 i = 0; i < 100; i++)
    {
    Task.Run(() =>
    {
    
    using (var factory = grpcHelper.CreateClientFactory())
    {
    var client = factory.GetHttpClient<Greeter.GreeterClient>();
    var result = client.SayHelloAsync(new GrpcService1.HelloRequest() { Name = "hongyichao " + Environment.MachineName }).ResponseAsync.Result.Message;
    sb.Append(result);
    }
    
    }).ContinueWith(t => Interlocked.Increment(ref a));
    }
    while (Volatile.Read(ref a) < 100)
    {
    Thread.Sleep(100);
    }
    return JsonConvert.SerializeObject(sb);
    }

    最终100个连接使用了3个端口号就可以解决。这样既解决了只复用单个端口号,又解决了单链接无法复用端口号问题。解决


    最终在吐槽下自己,昨天面试渣成啥样了。下一篇开始研究Rabbitmq了。另外,大家得注意,现在都流行代码测试。多做做题。

    不然即使像我这种老司机,也在很简单很简单的题目上遭遇滑铁卢。但是也不能一味着写算法,也多多看源码,毕竟这是我们的工作

    分割线----------------------

    最后吐槽下:写完之后,公司上层决定不采用我的实现方式,因为问我能不能保证完全可以,我说得先上去测试下,才能下结论。然后测试机会都没,直接全部将grpc换成api的形式了。哎,之前的努力都付诸东流。

  • 相关阅读:
    Proj THUDBFuzz Paper Reading: 北京大学软件分析课程2019,熊英飞, 03 数据流分析II
    Proj THUDBFuzz Paper Reading: 北京大学软件分析课程2019,熊英飞, 02 数据流分析I
    Proj THUDBFuzz Paper Reading: 北京大学软件分析课程2019,熊英飞, 01 intro
    Proj THUDBFuzz Paper Reading:Static Analysis-Based Approaches for Secure Software Development, 2018
    线段树-sum/max/min/区间更新
    HDU1166-ZKW树
    HDU1754-ZKW线段树
    x86/x64/x86_64/i386/ia32/ia64/amd/amd64 辨析
    Intel pin 2.14/CentOS 6 X86-64/安装
    centos6-honeyd安装&配置
  • 原文地址:https://www.cnblogs.com/yichaohong/p/14446694.html
Copyright © 2011-2022 走看看