zoukankan      html  css  js  c++  java
  • grpc源码分析之域名解析

    环境:

      win7_x64,VS2015、grpc_1.3.1

    场景:

      在客户端中使用grpc连接服务器,在多次输入非法的地址后,再次输入正确的地址连出现连接超时的现象。侯捷先生说过“源码面前,了无秘密”,所以开始分析grpc源码

    使用GRPC进行连接的例子:

    ///< 创建通道
    std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel("127.0.0.1:8080", grpc::InsecureChannelCredentials());
    ///< 3秒超时
    gpr_timespec tm_out{3, 0, GPR_TIMESPAN};
    ///< 等待连接
    bool connected = channel->WaitForConnected<gpr_timespec>(tm_out);
    if(connected) {
        std::cout << "connect success!" << std::endl;
    }
    else {
        std::cout << "connect fail!" << std::endl;
    }

    分析GRPC域名解析过程:

    一、创建通道

    1.1 创建通道证书

      有非安全的InsecureChannelCredentials,还有一个安全的SecureChannelCredentials,这里我们使用非安全的通道

    grpc::InsecureChannelCredentials()

    1.2 创建通道,在grpc_channel_create_with_builder(channel.c)函数中

    grpc_channel *grpc_channel_create_with_builder(
        grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder,
        grpc_channel_stack_type channel_stack_type) {
      
      ......
      channel->target = target;
      channel->is_client = grpc_channel_stack_type_is_client(channel_stack_type);
      ......
      
      grpc_compression_options_init(&channel->compression_options);
      for (size_t i = 0; i < args->num_args; i++) {
        if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) {
          if (args->args[i].type != GRPC_ARG_STRING) {
            gpr_log(GPR_ERROR, "%s ignored: it must be a string",
                    GRPC_ARG_DEFAULT_AUTHORITY);
          } else {
            channel->default_authority = grpc_mdelem_from_slices(
                exec_ctx, GRPC_MDSTR_AUTHORITY,
                grpc_slice_intern(
                    grpc_slice_from_static_string(args->args[i].value.string)));
          }
        }
        ......
      }
    
    done:
      grpc_channel_args_destroy(exec_ctx, args);
      return channel;
    }

      1.2.1 设置通道连接的目标(即服务器地址)

      1.2.2 设置通道类型为客户端通道(GRPC_CLIENT_CHANNEL),有6种通道类型,有一些是负载均衡使用的,在channel_stack_type.h文件中定义。

      1.2.3 设置默认的权限

    二、域名解析

    2.1 开始解析(dns_resolver.c)

    static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx,
                                           dns_resolver *r) {
      GRPC_RESOLVER_REF(&r->base, "dns-resolving");
      GPR_ASSERT(!r->resolving);
      r->resolving = true;
      r->addresses = NULL;
      grpc_resolve_address(
          exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties,
          grpc_closure_create(dns_on_resolved_locked, r,
                              grpc_combiner_scheduler(r->base.combiner, false)),
          &r->addresses);
    }

      grpc_resolve_address就是一个函数指针,在不同的平台下,指向不同的函数;在windows平台下则指向resolve_address_impl函数(resolve_address_windows.c)。

      dns_on_resolved_locked函数用于处理dns解析的结果,如果解析失败,则会在一段时间后再次发送一个解析请求。

    2.2 设置解析名称并在另一线程进行请求

    static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
                                     const char *default_port,
                                     grpc_pollset_set *interested_parties,
                                     grpc_closure *on_done,
                                     grpc_resolved_addresses **addresses) {
      request *r = gpr_malloc(sizeof(request));
      grpc_closure_init(&r->request_closure, do_request_thread, r,
                        grpc_executor_scheduler);
      r->name = gpr_strdup(name);
      r->default_port = gpr_strdup(default_port);
      r->on_done = on_done;
      r->addresses = addresses;
      grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
    }

      解析过程是通过do_request_thread函数完成的,这里进行打包,在另一个地方进行调用

    static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
                              grpc_error *error) {
      gpr_mu_lock(&g_executor.mu);
      if (g_executor.shutting_down == 0) {
        grpc_closure_list_append(&g_executor.closures, closure, error);
        maybe_spawn_locked();
      }
      gpr_mu_unlock(&g_executor.mu);
    }

      添加解析操作到全局变量g_executor.closures中,然后调用maybe_spawn_locked函数。  

    static void maybe_spawn_locked() {
      if (grpc_closure_list_empty(g_executor.closures) == 1) {
        return;
      }
      if (g_executor.shutting_down == 1) {
        return;
      }
    
      if (g_executor.busy != 0) {
        /* Thread still working. New work will be picked up by already running
         * thread. Not spawning anything. */
        return;
      } else if (g_executor.pending_join != 0) {
        /* Pickup the remains of the previous incarnations of the thread. */
        gpr_thd_join(g_executor.tid);
        g_executor.pending_join = 0;
      }
    
      /* All previous instances of the thread should have been joined at this point.
       * Spawn time! */
      g_executor.busy = 1;
      GPR_ASSERT(gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
                             &g_executor.options));
      g_executor.pending_join = 1;
    }

      如果队列g_executor.closures为空或者是已经关闭,直接返回。

      如果g_executor.busy !=0 表示解析线程已经被创建,直接返回,解析线程会依次从队列中取出待解析的名称,进行解析。

      如果上面都没有执行,则调用gpr_thd_new函数创建解析线程。

    2.3 在线程函数中,依次从队列中取出待解析的对象调用do_request_thread函数

    static void closure_exec_thread_func(void *ignored) {
      grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
      while (1) {
        gpr_mu_lock(&g_executor.mu);
        if (g_executor.shutting_down != 0) {
          gpr_mu_unlock(&g_executor.mu);
          break;
        }
        if (grpc_closure_list_empty(g_executor.closures)) {
          /* no more work, time to die */
          GPR_ASSERT(g_executor.busy == 1);
          g_executor.busy = 0;
          gpr_mu_unlock(&g_executor.mu);
          break;
        } else {
          grpc_closure *c = g_executor.closures.head;
          grpc_closure_list_init(&g_executor.closures);
          gpr_mu_unlock(&g_executor.mu);
          while (c != NULL) {
            grpc_closure *next = c->next_data.next;
            grpc_error *error = c->error_data.error;
    #ifndef NDEBUG
            c->scheduled = false;
    #endif
            c->cb(&exec_ctx, c->cb_arg, error);
            GRPC_ERROR_UNREF(error);
            c = next;
          }
          grpc_exec_ctx_flush(&exec_ctx);
        }
      }
      grpc_exec_ctx_finish(&exec_ctx);
    }

      cb函数是在打包时设置的,其实就是do_request_thread函数,详看2.2

    2.4 do_request_thread函数直接调用blocking_resolve_address_impl函数进行处理

    static grpc_error *blocking_resolve_address_impl(    const char *name, const char *default_port,    grpc_resolved_addresses **addresses) {

    struct addrinfo hints; struct addrinfo *result = NULL, *resp; char *host; char *port; int s; size_t i; grpc_error *error = GRPC_ERROR_NONE; /* parse name, splitting it into host and port parts */ gpr_split_host_port(name, &host, &port); ......if (port == NULL) { ...... port = gpr_strdup(default_port); } /* Call getaddrinfo */ memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ GRPC_SCHEDULING_START_BLOCKING_REGION; s = getaddrinfo(host, port, &hints, &result);
    ......
    }

      首先,从名称中获取主机名和端口

        如果名称中没有端口,则使用默认的端口代替

      然后,调用getaddrinfo函数获取IP地址

    2.5 dns解析结果的处理dns_on_resolved_locked

    static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                       grpc_error *error) {
      if (r->addresses != NULL) {
        grpc_lb_addresses *addresses = grpc_lb_addresses_create(
            r->addresses->naddrs, NULL /* user_data_vtable */);
        for (size_t i = 0; i < r->addresses->naddrs; ++i) {
          grpc_lb_addresses_set_address(
              addresses, i, &r->addresses->addrs[i].addr,
              r->addresses->addrs[i].len, false /* is_balancer */,
              NULL /* balancer_name */, NULL /* user_data */);
        }
      } else {
        grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r,
                          grpc_combiner_scheduler(r->base.combiner, false));
        grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now);
      }
    }

      如果r->addresses == NULL,则解析失败,插入dns_on_retry_timer_locked函数在下个时间后调用。

    static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg,
                                          grpc_error *error) {
      dns_resolver *r = arg;
    
      r->have_retry_timer = false;
      if (error == GRPC_ERROR_NONE) {
        if (!r->resolving) {
          dns_start_resolving_locked(exec_ctx, r);
        }
      }
    
      GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer");
    }

      dns_on_retry_timer_locked调用dns_start_resolving_locked函数重新开始解析请求2.1,解析失败的请求并不会被直接删除,而是进行多次尝试。

    分析连接超时的过程:

      1. 设置非法(不存在)的名称

      2. 调用getaddrinfo函数进行解析

        因为名称不存在,但是域名解析服务器可能认为名称是"存在"的,只是没有找到而已,所以需要更多的时间来进行查找。

      3. getaddrinfo函数是阻塞的,在函数返回之前其他的名称只是简单添加到了队列之中,并没有立即进行解析。

      4. 多次输入非法的名称之后,非法的解析请求被没有被直接删除,而是多长尝试重新请求,最后导致队列过长,即使设置了合法的名称,但是还没有对它进行解析,结果就是3秒超时。

    解决方案:

      1. 只允许输入合法IP地址,这样不会进行名称解析或解析立即返回,可以利用正则表达式对输入IP进行合法性校验。

      2. 域名解析交给其他的第三方库完成,将获取的合法IP设置给GRPC使用

  • 相关阅读:
    SQL Server解惑——查询条件IN中能否使用变量
    依赖注入高级玩法——注入接口服务的多个实现类
    最纯净的硬件检测工具箱
    批处理体会hosts文件
    针对m3u8视频的ts文件解密
    HLS协议之m3u8和ts流格式详解
    您的主机不满足在启用Hyper-V或Device/Credential Guard的情况下运行VMwareWorkstation的最低要求
    FFmpeg的安装和使用
    如何下载 blob 地址的视频资源
    Win10系统创建WiFi热点的两种方法
  • 原文地址:https://www.cnblogs.com/dongc/p/6914107.html
Copyright © 2011-2022 走看看