zoukankan      html  css  js  c++  java
  • 深入解读Service Mesh的数据面Envoy

    此文已由作者刘超授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。

    但是对于数据面的关键组件Envoy没有详细解读,这篇文章补上。

    一、Envoy的工作模式

     

    Envoy的工作模式如图所示,横向是管理平面。

    Envoy会暴露admin的API,可以通过API查看Envoy中的路由或者集群的配置。

    例如通过curl http://127.0.0.1:15000/routes可以查看路由的配置,结果如下图,请记住路由的配置层级,后面在代码中会看到熟悉的数据结构。

     

    routes下面有virtual_hosts,里面有带prefix的route,里面是route entry,里面是weight cluster,对于不同的cluster不同的路由权重,再里面是cluster的列表,有cluster的名称和权重。

    再如通过curl http://127.0.0.1:15000/clusters可以得到集群也即cluster的配置,这里面是真正cluster的信息。

     

    在另外一面,Envoy会调用envoy API去pilot里面获取路由和集群的配置,envoy API的详情可以查看https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文档,可以看到route的配置详情,也是按照上面的层级组织的。

    当Envoy从pilot获取到路由和集群信息之后,会保存在内存中,当有个客户端要连接后端的时候,客户端处于Downstream,后端处于Upstream,当数据从Downstream流向Upstream的时候,会通过Filter,根据路由和集群的配置,选择后端的应用建立连接,将请求转发出去。

    接下来我们来看Envoy是如何实现这些的。

    二、Envoy的关键数据结构的创建

     

    Envoy的启动会在main函数中创建Envoy::MainCommon,在它的构造函数中,会调用父类MainCommonBase的构造函数,创建Server::InstanceImpl,接下来调用InstanceImpl::initialize(...)

    接下来就进入关键的初始化阶段。

    加载初始化配置,里面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。

    InstanceUtil::loadBootstrapConfig(bootstrap_, options);

    创建AdminImpl,从而可以接受请求接口

    admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(),
    initial_config.admin().profilePath(), *this));

    创建ListenerManagerImpl,用于管理监听,因为Downstream要访问Upstream的时候,envoy会进行监听,Downstream会连接监听的端口。

    listener_manager_.reset(
    new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));

    在ListenerManagerImpl的构造函数中,创建了很多的worker,Envoy采用libevent监听socket的事件,当有一个新的连接来的时候,会将任务分配给某个worker进行处理,从而实现异步的处理。

    ListenerManagerImpl::ListenerManagerImpl(...) {
     for (uint32_t i = 0; i < server.options().concurrency(); i++) {
       workers_.emplace_back(worker_factory.createWorker());
     }
    }

    createWorker会创建WorkerImpl,初始化WorkerImpl需要两个重要的参数。

    一个是allocateDispatcher创建出来的DispatcherImpl,用来封装libevent的事件分发的。

    一个是ConnectionHandlerImpl,用来管理一个连接的。

    我们接着看初始化过程,创建ProdClusterManagerFactory,用于创建Cluster Manager,管理上游的集群。

    cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);

    在ProdClusterManagerFactory会创建ClusterManagerImpl,在ClusterManagerImpl的构造函数中,如果配置了CDS,就需要订阅Cluster Discover Service。

    if (bootstrap.dynamic_resources().has_cds_config()) {
       cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this);
       init_helper_.setCds(cds_api_.get());
     }

    会调用ProdClusterManagerFactory::createCds,里面会创建CdsApiImpl。在CdsApiImpl的构造函数中,会创建CdsSubscription,订阅CDS,当集群的配置发生变化的时候,会调用CdsApiImpl::onConfigUpdate(...)

    接下来在InstanceImpl::initialize的初始化函数中,如果配置了LDS,就需要订阅Listener Discover Service。

    if (bootstrap_.dynamic_resources().has_lds_config()) {
    listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
     }

    ListenerManagerImpl的createLdsApi会调用ProdListenerComponentFactory的createLdsApi函数,会创建LdsApiImpl。在LdsApiImpl的构造函数中,会创建LdsSubscription,订阅LDS,当Listener的配置改变的时候,会调用LdsApiImpl::onConfigUpdate(...)

    三、Envoy的启动

     

    MainCommonBase::run() 会调用InstanceImpl::run(),会调用InstanceImpl::startWorkers(),会调用ListenerManagerImpl::startWorkers。

    startWorkers会将Listener添加到所有的worker中,然后启动worker的线程。

    void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
       workers_started_ = true;
       for (const auto& worker : workers_) {
           for (const auto& listener : active_listeners_) {
               addListenerToWorker(*worker, *listener);
           }
           worker->start(guard_dog);
       }
    }

    对于每一个WorkerImpl,会调用ConnectionHandlerImpl的addListener函数,会创建ActiveListener对象,ActiveListener很重要,他的函数会在libevent收到事件的时候被调用。

    在ActiveListener的构造函数中,会调用相同Worker的DispatcherImpl的createListener,创建Network::ListenerImpl,注意这个类的namespace,因为对于Envoy来讲,LDS里面有个Listener的概念,但是Socket也有Listener的概念,在Network这个namespace下面的,是对socket和libevent的封装。

    在Network::ListenerImpl的构造函数中,当收到事件的时候,会调用注册的listenCallback函数。

    if (bind_to_port) {
       listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

    在listenCallback函数中,会调用onAccept函数,这个函数是ConnectionHandlerImpl::ActiveListener::onAccept函数。

    listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);

    四、Envoy从Pilot中获取Listener

     

    当Listener的配置有变化的时候,会调用LdsApiImpl::onConfigUpdate(...)。

    会调用ListenerManagerImpl的addOrUpdateListener(...)函数,在这里面,会创建一个ListenerImpl,

    这里的listener是LDS定义的Listener,而非网络的Listener了。

    在ListenerImpl的构造函数中,首先会创建ListenerFilter,可以对监听的socket进行定制化的配置,例如为了实现use_original_dst,就需要加入一个ListenerFilter。

    为了创建ListenerFilter,先要调用ProdListenerComponentFactory::createListenerFilterFactoryList_来创建工厂。

    创建完了ListenerFilter之后,为了对于进来的网络包进行处理,会创建NetworkFilter,正是这些NetworkFilter实现了对网络包的解析,并根据网络包的内容,实现路由和负载均衡策略。

    为了创建NetworkFilter,先要调用ProdListenerComponentFactory::createNetworkFilterFactoryList_来创建工厂,在这个函数里面,会遍历所有的NamedNetworkFilterConfigFactory,也即起了名字的filter都过一遍,都有哪些呢?class NetworkFilterNameValues里面有定义,这里面最重要的是:

    // HTTP connection manager filter
    const std::string HttpConnectionManager = "envoy.http_connection_manager";

    我们这里重点看HTTP协议的转发,我们重点看这个名字对应的HttpConnectionManagerFilterConfigFactory,并调用他的createFilterFactory函数,返回一个Network::FilterFactoryCb类型的callback函数。

    五、Envoy订阅RDS

     

    HttpConnectionManagerFilterConfigFactory的createFilterFactory函数最终调用createFilterFactoryFromProtoTyped函数中,会创建

    Router::RouteConfigProviderManagerImpl。

    std::shared_ptr<Router::RouteConfigProviderManager> 
       route_config_provider_manager
           = context.singletonManager().getTyped
               <Router::RouteConfigProviderManager>(
               SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] {
               return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin());
    });

    创建完毕Router::RouteConfigProviderManagerImpl之后,会创建HttpConnectionManagerConfig,将Router::RouteConfigProviderManagerImpl作为成员变量传入。

    std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));

    在HttpConnectionManagerConfig的构造函数中,调用Router::RouteConfigProviderUtil::create。

    route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);

    Router::RouteConfigProviderUtil::create会调用RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在这个函数里面,会创建RdsRouteConfigSubscription订阅RDS,然后创建RdsRouteConfigProviderImpl。

    当Router的配置发生变化的时候,会调用RdsRouteConfigProviderImpl::onConfigUpdate(),在这个函数里面,会从pilot获取Route的配置,生成ConfigImpl对象。

    当我们仔细分析ConfigImpl的时候,我们发现,这个数据结构和第一节Envoy中的路由配置是一样的。

    在ConfigImpl里面有RouteMatcher用于匹配路由,在RouteMatcher里面有VirtualHostImpl,在VirtualHostImpl里面有RouteEntryImplBase,其中一种RouteEntry是WeightedClusterEntry,在WeightedClusterEntry里面有cluster_name_和cluster_weight_。

    到此RouteConfigProviderManagerImpl如何订阅RDS告一段落,我们回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在这个函数的最后一部分,将返回Network::FilterFactoryCb,是一个callback函数,作为createFilterFactoryFromProtoTyped的返回值。

    在这个callback函数中,会创建Http::ConnectionManagerImpl,将HttpConnectionManagerConfig作为成员变量传入,Http::ConnectionManagerImpl是一个Network::ReadFilter。在Envoy里面,有Network::WriteFilter和Network::ReadFilter,其中从Downstream发送到Upstream的使用Network::ReadFilter对网络包进行处理,反方向的是使用Network::WriteFilter。

    callback函数创建ConnectionManagerImpl之后调用filter_manager.addReadFilter,将ConnectionManagerImpl放到ReadFilter列表中。当然这个callback函数现在是不调用的。

    createFilterFactoryFromProtoTyped的返回值callback函数会返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函数,这个函数返回一个callback函数列表,其中一个就是上面生成的这个函数。

    再返回就回到了ListenerImpl的构造函数,他会调用addFilterChain,将ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函数列表加入到链里面。

    六、当一个新的连接建立的时候

     

    通过前面的分析,我们制定当一个新的连接建立的时候,会触发libevent的事件,最终调用ConnectionHandlerImpl::ActiveListener::onAccept函数。

    在这个函数中,会调用使用上面createListenerFilterFactoryList_创建的ListenerFilter的工厂创建ListenerFilter,然后使用这些Filter进行处理。

    // Create and run the filters
    config_.filterChainFactory().createListenerFilterChain(*active_socket);
    active_socket->continueFilterChain(true);

    ConnectionHandlerImpl::ActiveSocket::continueFilterChain函数调用ConnectionHandlerImpl::ActiveListener::newConnection,创建一个新的连接。

    在ConnectionHandlerImpl::ActiveListener::newConnection函数中,先是会调用DispatcherImpl::createServerConnection,创建Network::ConnectionImpl,在Network::ConnectionImpl的构造函数里面:

    file_event_ = dispatcher_.createFileEvent(
    fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);

    对于一个新的socket连接,对于操作系统来讲是一个文件,也即有个文件描述符,当一个socket可读的时候,会触发一个事件,当一个socket可写的时候,可以触发另一个事件,发生事件后,调用onFileEvent。

    在ConnectionHandlerImpl::ActiveListener::newConnection函数中,接下来就应该为这个连接创建NetworkFilter了。

    config_.filterChainFactory().createNetworkFilterChain(
    *new_connection, filter_chain->networkFilterFactories());

    上面这段代码,会调用ListenerImpl::createNetworkFilterChain。

    里面调用Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);

    bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories) 
    {
       for (const Network::FilterFactoryCb& factory : factories) {
           factory(filter_manager);
       }
       return filter_manager.initializeReadFilters();
    }

    可以看出这里会调用上面生成的callback函数,将ConnectionManagerImpl放到ReadFilter列表中。

    七、当有新的数据到来的时候

     

    当Downstream发送数据到Envoy的时候,socket就处于可读的状态,因而ConnectionImpl::onFileEvent函数会被调用,当事件是Event::FileReadyType::Read的时候,调用ConnectionImpl::onReadReady()。

    在ConnectionImpl::onReadReady()函数里面,socket将数据读入缓存。

    IoResult result = transport_socket_->doRead(read_buffer_);

    然后调用ConnectionImpl::onRead,里面调用filter_manager_.onRead()使用NetworkFilter对于数据进行处理。

    FilterManagerImpl::onRead() 会调用FilterManagerImpl::onContinueReading有下面的逻辑。

    for (; entry != upstream_filters_.end(); entry++) {
       BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
       if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
           FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
           if (status == FilterStatus::StopIteration) {
               return;
           }
       }
    }

    对于每一个Filter,都调用onData函数,咱们上面解析过,其中HTTP对应的ReadFilter是ConnectionManagerImpl,因而调用ConnectionManagerImpl::onData函数。

    八、对数据进行解析

    ConnectionManagerImpl::onData函数中,首先创建数据解析器。

    codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);

    调用HttpConnectionManagerConfig::createCodec,对于HTTP1,创建Http::Http1::ServerConnectionImpl。

    然后对数据进行解析。

    codec_->dispatch(data);

    调用ConnectionImpl::dispatch,里面调用ConnectionImpl::dispatchSlice,在这里面对HTTP进行解析。

    http_parser_execute(&parser_, &settings_, slice, len);

     

       

    解析的参数是settings_,可以看出这里面解析url,然后解析header,结束后调用onHeadersCompleteBase()函数,然后解析正文。

    由于路由是根据HTTP头里面的信息来的,因而我们重点看ConnectionImpl::onHeadersCompleteBase(),里面会调用ServerConnectionImpl::onHeadersComplete函数。

    active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

    ServerConnectionImpl::onHeadersComplete里面调用ConnectionManagerImpl::ActiveStream::decodeHeaders这到了路由策略的重点。

    这里面调用了两个函数,一个是refreshCachedRoute()刷新路由配置,并查找到匹配的路由项Route Entry。

    另一个是ConnectionManagerImpl::ActiveStream::decodeHeaders的另一个实现。

    decodeHeaders(nullptr, *request_headers_, end_stream);

    在这里会通过Route Entry找到后端的集群,并建立连接。

    九、路由匹配

    我们先来解析refreshCachedRoute()

    void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
       Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
       request_info_.route_entry_ = route ? route->routeEntry() : nullptr;
    cached_route_ = std::move(route);
    }

    snapped_route_config_是什么呢?snapped_route_config_的初始化如下。

    snapped_route_config_(connection_manager.config_.routeConfigProvider().config())

    routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函数返回的就是ConfigImpl,也就是上面我们描述过的层级的数据结构。

     

    ConfigImpl的route函数如下

    RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override {
       return route_matcher_->route(headers, random_value);
    }

    RouteMatcher的route函数,根据headers.Host()查找virtualhost,然后调用 VirtualHostImpl::getRouteFromEntries。

    RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const {
       const VirtualHostImpl* virtual_host = findVirtualHost(headers);
       if (virtual_host) {
           return virtual_host->getRouteFromEntries(headers, random_value);
       } else {
           return nullptr;
       }
    }

    VirtualHostImpl::getRouteFromEntries函数里面有下面的循环

    for (const RouteEntryImplBaseConstSharedPtr& route : routes_) {
       RouteConstSharedPtr route_entry = route->matches(headers, random_value);
       if (nullptr != route_entry) {
           return route_entry;
       }
    }

    对于每一个RouteEntry,看是否匹配。例如常用的有PrefixRouteEntryImpl::matches,看前缀是否一致。

    RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const {
       if (RouteEntryImplBase::matchRoute(headers, random_value) &&
    StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) {
           return clusterEntry(headers, random_value);
       }
       return nullptr;
    }

    得到匹配的Route Entry后,调用RouteEntryImplBase::clusterEntry获取一个Cluster的名字。对于WeightedCluster,会调用下面的函数。

    WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);

    在这个函数里面,会根据权重选择一个WeightedClusterEntry返回。

    这个时候,我们得到了后面Cluster,也即集群的名称,接下来需要得到集群的具体的IP地址并建立连接。

    十、查找集群并建立连接

    ConnectionManagerImpl::ActiveStream::decodeHeaders的另一个实现会调用Route.cc里面的Filter::decodeHeaders。

    在Filter::decodeHeaders函数中有以下的实现。

    // A route entry matches for the request.
    route_entry_ = route_->routeEntry();
    Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());

    通过上一节的查找,WeightedClusterEntry里面有cluster的名称,接下来就是从cluster manager里面根据cluster名称查找到cluster的信息。

    cluster manager就是我们最初创建的ClusterManagerImpl

    ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) {
       ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
       auto entry = cluster_manager.thread_local_clusters_.find(cluster);
       if (entry != cluster_manager.thread_local_clusters_.end()) {
           return entry->second.get();
       } else {
       return nullptr;
       }
    }

    返回ClusterInfoImpl,是cluster的信息。

    cluster_ = cluster->info();

    找到Cluster后,开始建立连接。

    // Fetch a connection pool for the upstream cluster.
    Http::ConnectionPool::Instance* conn_pool = getConnPool();

    route.cc的Filter::getConnPool()里面调用以下函数。

    config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);

    ClusterManagerImpl::httpConnPoolForCluster调用ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函数。

    在这个函数里面,HostConstSharedPtr host = lb_->chooseHost(context);通过负载均衡,在一个cluster里面选择一个后端的机器建立连接。

     

    网易云容器服务为用户提供了无服务器容器,让企业能够快速部署业务,轻松运维服务。容器服务支持弹性伸缩、垂直扩容、灰度升级、服务发现、服务编排、错误恢复及性能监测等功能。点击可免费试用

       

    最后。

    刘老师参加GIAC年度新人评选,马了这么多字,能帮忙投个票吗?请点击原文连接

    刘超 网易云技术架构部总监

    长期致力于云计算开源技术的分享,布道和落地,将网易内部最佳实践服务客户与行业。

    技术分享:出版《Lucene应用开发解密》,极客时间专栏《趣谈网络协议》,个人公众号《刘超的通俗云计算》文章Kubernetes及微服务系列18篇,Mesos系列30篇,KVM系列25篇,Openvswitch系列31篇,OpenStack系列24篇,Hadoop系列10篇。公众号文章《终于有人把云计算,大数据,人工智能讲明白了》累积10万+

    大会布道:InfoQ架构师峰会明星讲师,作为邀请讲师在QCon,LC3,SACC,GIAC,CEUC,SoftCon,NJSD等超过10场大型技术峰会分享网易的最佳实践

    行业落地:将网易的容器和微服务产品在银行,证券,物流,视频监控,智能制造等多个行业落地。

    网易云免费体验馆,0成本体验20+款云产品! 

    更多网易技术、产品、运营经验分享请点击

    相关文章:
    【推荐】 web调试利器_fiddler
    【推荐】 HBase – 存储文件HFile结构解析
    【推荐】 有运气摇号来不及挑选?网易有数帮你科学选房

  • 相关阅读:
    MVC模式-----struts2框架(2)
    MVC模式-----struts2框架
    html的<h>标签
    jsp脚本元素
    LeetCode "Paint House"
    LeetCode "Longest Substring with At Most Two Distinct Characters"
    LeetCode "Graph Valid Tree"
    LeetCode "Shortest Word Distance"
    LeetCode "Verify Preorder Sequence in Binary Search Tree"
    LeetCode "Binary Tree Upside Down"
  • 原文地址:https://www.cnblogs.com/163yun/p/9842220.html
Copyright © 2011-2022 走看看