zoukankan      html  css  js  c++  java
  • 深入解读Service Mesh的数据面Envoy


    此文已由作者刘超授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。

     

    但是对于数据面的关键组件Envoy没有详细解读,这篇文章补上。


    一、Envoy的工作模式

     

    Envoy的工作模式如图所示,横向是管理平面。

    Envoy会暴露admin的API,可以通过API查看Envoy中的路由或者集群的配置。

    例如通过curl http://127.0.0.1:15000/routes可以查看路由的配置,结果如下图,请记住路由的配置层级,后面在代码中会看到熟悉的数据结构。

     



    routes下面有virtual_hosts,里面有带prefix的route,里面是route entry,里面是weight cluster,对于不同的cluster不同的路由权重,再里面是cluster的列表,有cluster的名称和权重。

    再如通过curl http://127.0.0.1:15000/clusters可以得到集群也即cluster的配置,这里面是真正cluster的信息。

     


    在另外一面,Envoy会调用envoy API去pilot里面获取路由和集群的配置,envoy API的详情可以查看https://www.envoyproxy.io/docs/envoy/v1.8.0/api-v2/http_routes/http_routes中的API文档,可以看到route的配置详情,也是按照上面的层级组织的。

    当Envoy从pilot获取到路由和集群信息之后,会保存在内存中,当有个客户端要连接后端的时候,客户端处于Downstream,后端处于Upstream,当数据从Downstream流向Upstream的时候,会通过Filter,根据路由和集群的配置,选择后端的应用建立连接,将请求转发出去。

    接下来我们来看Envoy是如何实现这些的。


    二、Envoy的关键数据结构的创建

     

    Envoy的启动会在main函数中创建Envoy::MainCommon,在它的构造函数中,会调用父类MainCommonBase的构造函数,创建Server::InstanceImpl,接下来调用InstanceImpl::initialize(...)

    接下来就进入关键的初始化阶段。

    加载初始化配置,里面配置了Listener Discover Service, Router Discover Service, Cluster Discover Service等。

    InstanceUtil::loadBootstrapConfig(bootstrap_, options);

    创建AdminImpl,从而可以接受请求接口

    admin_.reset(new AdminImpl(initial_config.admin().accessLogPath(),
    initial_config.admin().profilePath(), *this));

    创建ListenerManagerImpl,用于管理监听,因为Downstream要访问Upstream的时候,envoy会进行监听,Downstream会连接监听的端口。

    listener_manager_.reset(
    new ListenerManagerImpl(*this, listener_component_factory_, worker_factory_, time_system_));

    在ListenerManagerImpl的构造函数中,创建了很多的worker,Envoy采用libevent监听socket的事件,当有一个新的连接来的时候,会将任务分配给某个worker进行处理,从而实现异步的处理。

    ListenerManagerImpl::ListenerManagerImpl(...) {
     for (uint32_t i = 0; i < server.options().concurrency(); i++) {
       workers_.emplace_back(worker_factory.createWorker());
     }
    }

    createWorker会创建WorkerImpl,初始化WorkerImpl需要两个重要的参数。

    一个是allocateDispatcher创建出来的DispatcherImpl,用来封装libevent的事件分发的。

    一个是ConnectionHandlerImpl,用来管理一个连接的。

    我们接着看初始化过程,创建ProdClusterManagerFactory,用于创建Cluster Manager,管理上游的集群。

    cluster_manager_factory_.reset(new Upstream::ProdClusterManagerFactory(...);

    在ProdClusterManagerFactory会创建ClusterManagerImpl,在ClusterManagerImpl的构造函数中,如果配置了CDS,就需要订阅Cluster Discover Service。

    if (bootstrap.dynamic_resources().has_cds_config()) {
       cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this);
       init_helper_.setCds(cds_api_.get());
     }

    会调用ProdClusterManagerFactory::createCds,里面会创建CdsApiImpl。在CdsApiImpl的构造函数中,会创建CdsSubscription,订阅CDS,当集群的配置发生变化的时候,会调用CdsApiImpl::onConfigUpdate(...)

    接下来在InstanceImpl::initialize的初始化函数中,如果配置了LDS,就需要订阅Listener Discover Service。

    if (bootstrap_.dynamic_resources().has_lds_config()) {
    listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
     }

    ListenerManagerImpl的createLdsApi会调用ProdListenerComponentFactory的createLdsApi函数,会创建LdsApiImpl。在LdsApiImpl的构造函数中,会创建LdsSubscription,订阅LDS,当Listener的配置改变的时候,会调用LdsApiImpl::onConfigUpdate(...)


    三、Envoy的启动

     


    MainCommonBase::run() 会调用InstanceImpl::run(),会调用InstanceImpl::startWorkers(),会调用ListenerManagerImpl::startWorkers。

    startWorkers会将Listener添加到所有的worker中,然后启动worker的线程。

    void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) {
       workers_started_ = true;
       for (const auto& worker : workers_) {
           for (const auto& listener : active_listeners_) {
               addListenerToWorker(*worker, *listener);
           }
           worker->start(guard_dog);
       }
    }

    对于每一个WorkerImpl,会调用ConnectionHandlerImpl的addListener函数,会创建ActiveListener对象,ActiveListener很重要,他的函数会在libevent收到事件的时候被调用。

    在ActiveListener的构造函数中,会调用相同Worker的DispatcherImpl的createListener,创建Network::ListenerImpl,注意这个类的namespace,因为对于Envoy来讲,LDS里面有个Listener的概念,但是Socket也有Listener的概念,在Network这个namespace下面的,是对socket和libevent的封装。

    在Network::ListenerImpl的构造函数中,当收到事件的时候,会调用注册的listenCallback函数。

    if (bind_to_port) {
       listener_.reset(evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.fd()));

    在listenCallback函数中,会调用onAccept函数,这个函数是ConnectionHandlerImpl::ActiveListener::onAccept函数。

    listener->cb_.onAccept(std::make_unique<AcceptedSocketImpl>(fd, local_address, remote_address), listener->hand_off_restored_destination_connections_);

     

    四、Envoy从Pilot中获取Listener

     


    当Listener的配置有变化的时候,会调用LdsApiImpl::onConfigUpdate(...)。


    会调用ListenerManagerImpl的addOrUpdateListener(...)函数,在这里面,会创建一个ListenerImpl,

    这里的listener是LDS定义的Listener,而非网络的Listener了。

    在ListenerImpl的构造函数中,首先会创建ListenerFilter,可以对监听的socket进行定制化的配置,例如为了实现use_original_dst,就需要加入一个ListenerFilter。

    为了创建ListenerFilter,先要调用ProdListenerComponentFactory::createListenerFilterFactoryList_来创建工厂。

    创建完了ListenerFilter之后,为了对于进来的网络包进行处理,会创建NetworkFilter,正是这些NetworkFilter实现了对网络包的解析,并根据网络包的内容,实现路由和负载均衡策略。

    为了创建NetworkFilter,先要调用ProdListenerComponentFactory::createNetworkFilterFactoryList_来创建工厂,在这个函数里面,会遍历所有的NamedNetworkFilterConfigFactory,也即起了名字的filter都过一遍,都有哪些呢?class NetworkFilterNameValues里面有定义,这里面最重要的是:

    // HTTP connection manager filter
    const std::string HttpConnectionManager = "envoy.http_connection_manager";

    我们这里重点看HTTP协议的转发,我们重点看这个名字对应的HttpConnectionManagerFilterConfigFactory,并调用他的createFilterFactory函数,返回一个Network::FilterFactoryCb类型的callback函数。


    五、Envoy订阅RDS

     


    HttpConnectionManagerFilterConfigFactory的createFilterFactory函数最终调用createFilterFactoryFromProtoTyped函数中,会创建

    Router::RouteConfigProviderManagerImpl。

    std::shared_ptr<Router::RouteConfigProviderManager> 
       route_config_provider_manager
           = context.singletonManager().getTyped
               <Router::RouteConfigProviderManager>(
               SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] {
               return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin());
    });

    创建完毕Router::RouteConfigProviderManagerImpl之后,会创建HttpConnectionManagerConfig,将Router::RouteConfigProviderManagerImpl作为成员变量传入。

    std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig(proto_config, context, *date_provider, *route_config_provider_manager));

    在HttpConnectionManagerConfig的构造函数中,调用Router::RouteConfigProviderUtil::create。

    route_config_provider_ = Router::RouteConfigProviderUtil::create(config, context_, stats_prefix_,route_config_provider_manager_);

    Router::RouteConfigProviderUtil::create会调用RouteConfigProviderManagerImpl::createRdsRouteConfigProvider,在这个函数里面,会创建RdsRouteConfigSubscription订阅RDS,然后创建RdsRouteConfigProviderImpl。

    当Router的配置发生变化的时候,会调用RdsRouteConfigProviderImpl::onConfigUpdate(),在这个函数里面,会从pilot获取Route的配置,生成ConfigImpl对象。

    当我们仔细分析ConfigImpl的时候,我们发现,这个数据结构和第一节Envoy中的路由配置是一样的。

    在ConfigImpl里面有RouteMatcher用于匹配路由,在RouteMatcher里面有VirtualHostImpl,在VirtualHostImpl里面有RouteEntryImplBase,其中一种RouteEntry是WeightedClusterEntry,在WeightedClusterEntry里面有cluster_name_和cluster_weight_。

    到此RouteConfigProviderManagerImpl如何订阅RDS告一段落,我们回到HttpConnectionManagerFilterConfigFactory的createFilterFactoryFromProtoTyped中,在这个函数的最后一部分,将返回Network::FilterFactoryCb,是一个callback函数,作为createFilterFactoryFromProtoTyped的返回值。

    在这个callback函数中,会创建Http::ConnectionManagerImpl,将HttpConnectionManagerConfig作为成员变量传入,Http::ConnectionManagerImpl是一个Network::ReadFilter。在Envoy里面,有Network::WriteFilter和Network::ReadFilter,其中从Downstream发送到Upstream的使用Network::ReadFilter对网络包进行处理,反方向的是使用Network::WriteFilter。

    callback函数创建ConnectionManagerImpl之后调用filter_manager.addReadFilter,将ConnectionManagerImpl放到ReadFilter列表中。当然这个callback函数现在是不调用的。

    createFilterFactoryFromProtoTyped的返回值callback函数会返回到ProdListenerComponentFactory::createNetworkFilterFactoryList_函数,这个函数返回一个callback函数列表,其中一个就是上面生成的这个函数。

    再返回就回到了ListenerImpl的构造函数,他会调用addFilterChain,将ProdListenerComponentFactory::createNetworkFilterFactoryList_返回的callback函数列表加入到链里面。


    六、当一个新的连接建立的时候

     


    通过前面的分析,我们制定当一个新的连接建立的时候,会触发libevent的事件,最终调用ConnectionHandlerImpl::ActiveListener::onAccept函数。

    在这个函数中,会调用使用上面createListenerFilterFactoryList_创建的ListenerFilter的工厂创建ListenerFilter,然后使用这些Filter进行处理。

    // Create and run the filters
    config_.filterChainFactory().createListenerFilterChain(*active_socket);
    active_socket->continueFilterChain(true);

    ConnectionHandlerImpl::ActiveSocket::continueFilterChain函数调用ConnectionHandlerImpl::ActiveListener::newConnection,创建一个新的连接。

    在ConnectionHandlerImpl::ActiveListener::newConnection函数中,先是会调用DispatcherImpl::createServerConnection,创建Network::ConnectionImpl,在Network::ConnectionImpl的构造函数里面:

    file_event_ = dispatcher_.createFileEvent(
    fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write);

    对于一个新的socket连接,对于操作系统来讲是一个文件,也即有个文件描述符,当一个socket可读的时候,会触发一个事件,当一个socket可写的时候,可以触发另一个事件,发生事件后,调用onFileEvent。

    在ConnectionHandlerImpl::ActiveListener::newConnection函数中,接下来就应该为这个连接创建NetworkFilter了。

    config_.filterChainFactory().createNetworkFilterChain(
    *new_connection, filter_chain->networkFilterFactories());

    上面这段代码,会调用ListenerImpl::createNetworkFilterChain。

    里面调用Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);

    bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories) 
    {
       for (const Network::FilterFactoryCb& factory : factories) {
           factory(filter_manager);
       }
       return filter_manager.initializeReadFilters();
    }

    可以看出这里会调用上面生成的callback函数,将ConnectionManagerImpl放到ReadFilter列表中。


    七、当有新的数据到来的时候

     


    当Downstream发送数据到Envoy的时候,socket就处于可读的状态,因而ConnectionImpl::onFileEvent函数会被调用,当事件是Event::FileReadyType::Read的时候,调用ConnectionImpl::onReadReady()。

    在ConnectionImpl::onReadReady()函数里面,socket将数据读入缓存。

    IoResult result = transport_socket_->doRead(read_buffer_);

    然后调用ConnectionImpl::onRead,里面调用filter_manager_.onRead()使用NetworkFilter对于数据进行处理。

    FilterManagerImpl::onRead() 会调用FilterManagerImpl::onContinueReading有下面的逻辑。

    for (; entry != upstream_filters_.end(); entry++) {
       BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();
       if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
           FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
           if (status == FilterStatus::StopIteration) {
               return;
           }
       }
    }

    对于每一个Filter,都调用onData函数,咱们上面解析过,其中HTTP对应的ReadFilter是ConnectionManagerImpl,因而调用ConnectionManagerImpl::onData函数。


    八、对数据进行解析


    ConnectionManagerImpl::onData函数中,首先创建数据解析器。

    codec_ = config_.createCodec(read_callbacks_->connection(), data, *this);

    调用HttpConnectionManagerConfig::createCodec,对于HTTP1,创建Http::Http1::ServerConnectionImpl。

    然后对数据进行解析。

    codec_->dispatch(data);

    调用ConnectionImpl::dispatch,里面调用ConnectionImpl::dispatchSlice,在这里面对HTTP进行解析。

    http_parser_execute(&parser_, &settings_, slice, len);

     

       


    解析的参数是settings_,可以看出这里面解析url,然后解析header,结束后调用onHeadersCompleteBase()函数,然后解析正文。

    由于路由是根据HTTP头里面的信息来的,因而我们重点看ConnectionImpl::onHeadersCompleteBase(),里面会调用ServerConnectionImpl::onHeadersComplete函数。

    active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

    ServerConnectionImpl::onHeadersComplete里面调用ConnectionManagerImpl::ActiveStream::decodeHeaders这到了路由策略的重点。


    这里面调用了两个函数,一个是refreshCachedRoute()刷新路由配置,并查找到匹配的路由项Route Entry。

    另一个是ConnectionManagerImpl::ActiveStream::decodeHeaders的另一个实现。

    decodeHeaders(nullptr, *request_headers_, end_stream);

    在这里会通过Route Entry找到后端的集群,并建立连接。


    九、路由匹配

    我们先来解析refreshCachedRoute()

    void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() {
       Router::RouteConstSharedPtr route = snapped_route_config_->route(*request_headers_, stream_id_);
       request_info_.route_entry_ = route ? route->routeEntry() : nullptr;
    cached_route_ = std::move(route);
    }

    snapped_route_config_是什么呢?snapped_route_config_的初始化如下。

    snapped_route_config_(connection_manager.config_.routeConfigProvider().config())

    routeConfigProvider()返回的就是RdsRouteConfigProviderImpl,其config函数返回的就是ConfigImpl,也就是上面我们描述过的层级的数据结构。

     


    ConfigImpl的route函数如下

    RouteConstSharedPtr route(const Http::HeaderMap& headers, uint64_t random_value) const override {
       return route_matcher_->route(headers, random_value);
    }

    RouteMatcher的route函数,根据headers.Host()查找virtualhost,然后调用 VirtualHostImpl::getRouteFromEntries。

    RouteConstSharedPtr RouteMatcher::route(const Http::HeaderMap& headers, uint64_t random_value) const {
       const VirtualHostImpl* virtual_host = findVirtualHost(headers);
       if (virtual_host) {
           return virtual_host->getRouteFromEntries(headers, random_value);
       } else {
           return nullptr;
       }
    }

    VirtualHostImpl::getRouteFromEntries函数里面有下面的循环

    for (const RouteEntryImplBaseConstSharedPtr& route : routes_) {
       RouteConstSharedPtr route_entry = route->matches(headers, random_value);
       if (nullptr != route_entry) {
           return route_entry;
       }
    }

    对于每一个RouteEntry,看是否匹配。例如常用的有PrefixRouteEntryImpl::matches,看前缀是否一致。

    RouteConstSharedPtr PrefixRouteEntryImpl::matches(const Http::HeaderMap& headers,uint64_t random_value) const {
       if (RouteEntryImplBase::matchRoute(headers, random_value) &&
    StringUtil::startsWith(headers.Path()->value().c_str(), prefix_, case_sensitive_)) {
           return clusterEntry(headers, random_value);
       }
       return nullptr;
    }

    得到匹配的Route Entry后,调用RouteEntryImplBase::clusterEntry获取一个Cluster的名字。对于WeightedCluster,会调用下面的函数。

    WeightedClusterUtil::pickCluster(weighted_clusters_, total_cluster_weight_, random_value,true);

    在这个函数里面,会根据权重选择一个WeightedClusterEntry返回。

    这个时候,我们得到了后面Cluster,也即集群的名称,接下来需要得到集群的具体的IP地址并建立连接。


    十、查找集群并建立连接

    ConnectionManagerImpl::ActiveStream::decodeHeaders的另一个实现会调用Route.cc里面的Filter::decodeHeaders。


    在Filter::decodeHeaders函数中有以下的实现。

    // A route entry matches for the request.
    route_entry_ = route_->routeEntry();
    Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());

    通过上一节的查找,WeightedClusterEntry里面有cluster的名称,接下来就是从cluster manager里面根据cluster名称查找到cluster的信息。

    cluster manager就是我们最初创建的ClusterManagerImpl

    ThreadLocalCluster* ClusterManagerImpl::get(const std::string& cluster) {
       ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>();
       auto entry = cluster_manager.thread_local_clusters_.find(cluster);
       if (entry != cluster_manager.thread_local_clusters_.end()) {
           return entry->second.get();
       } else {
       return nullptr;
       }
    }

    返回ClusterInfoImpl,是cluster的信息。

    cluster_ = cluster->info();

    找到Cluster后,开始建立连接。

    // Fetch a connection pool for the upstream cluster.
    Http::ConnectionPool::Instance* conn_pool = getConnPool();

    route.cc的Filter::getConnPool()里面调用以下函数。

    config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this);

    ClusterManagerImpl::httpConnPoolForCluster调用ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool函数。

    在这个函数里面,HostConstSharedPtr host = lb_->chooseHost(context);通过负载均衡,在一个cluster里面选择一个后端的机器建立连接。


    网易云容器服务为用户提供了无服务器容器,让企业能够快速部署业务,轻松运维服务。容器服务支持弹性伸缩、垂直扩容、灰度升级、服务发现、服务编排、错误恢复及性能监测等功能。点击可免费试用

       

    最后。

    刘老师参加GIAC年度新人评选,马了这么多字,能帮忙投个票吗?请点击原文连接


    刘超 网易云技术架构部总监

    长期致力于云计算开源技术的分享,布道和落地,将网易内部最佳实践服务客户与行业。

    技术分享:出版《Lucene应用开发解密》,极客时间专栏《趣谈网络协议》,个人公众号《刘超的通俗云计算》文章Kubernetes及微服务系列18篇,Mesos系列30篇,KVM系列25篇,Openvswitch系列31篇,OpenStack系列24篇,Hadoop系列10篇。公众号文章《终于有人把云计算,大数据,人工智能讲明白了》累积10万+

    大会布道:InfoQ架构师峰会明星讲师,作为邀请讲师在QCon,LC3,SACC,GIAC,CEUC,SoftCon,NJSD等超过10场大型技术峰会分享网易的最佳实践

    行业落地:将网易的容器和微服务产品在银行,证券,物流,视频监控,智能制造等多个行业落地。


    网易云免费体验馆,0成本体验20+款云产品! 

    更多网易技术、产品、运营经验分享请点击


     


    相关文章:
    【推荐】 virtualenv简介以及一个比较折腾的scrapy安装方法
    【推荐】 JVM锁实现探究2:synchronized深探
    【推荐】 爬虫开发python工具包介绍 (4)

  • 相关阅读:
    爬虫相关
    进程、线程、协程
    经典排序算法详细介绍
    Pyhton学习-Python与中间件之Memcache(4)
    Python学习-Python操作数据库之MongoDB(2)
    Python学习-Python操作数据库之MySQL(1)
    人工智能安全(一)——初识人工智能
    Windows应急响应和系统加固(12)——SQL Server/MySQL/Oracle日志提取和安全分析
    Windows应急响应和系统加固(11)——Weblogic各类漏洞的日志分析和调查取证
    Windows应急响应和系统加固(10)——Nginx日志分析以及JBoss日志分析
  • 原文地址:https://www.cnblogs.com/zyfd/p/9842223.html
Copyright © 2011-2022 走看看