zoukankan      html  css  js  c++  java
  • Cloud Foundry中DEA与warden通信完毕应用port监听

            在Cloud Foundry v2版本号中,DEA为一个用户应用执行的控制模块,而应用的真正执行都是依附于warden。

    更详细的来说,是DEA接收到Cloud Controller的请求;DEA发送请求给warden server;warden server创建warden container并将用户应用droplet等环境配置好;DEA发送应用启动请求至warden serve;最后warden container执行启动脚本启动应用。


            本文主要详细描写叙述,DEA怎样与warden交互,以保证终于用户的应用能够成功绑定某一个port,实现用户应用对外提供服务。


            DEA在运行启动一个应用的时候,主要做到下面这些部分:promise_droplet, promise_container, 当中这两个部分并发完毕;promise_extract_droplet, promise_exec_hook_script(“before_start”), promise_start等。代码例如以下:

            [
              promise_droplet,
              promise_container
            ].each(&:run).each(&:resolve)
    
            [
              promise_extract_droplet,
              promise_exec_hook_script('before_start'),
              promise_start
            ].each(&:resolve)


    promise_droplet:

            在这一个环节,DEA主要做的工作是将droplet下载本机。通过droplet_uri,当中主要的路径在/config/dea.yml中,为base_dir: /tmp/dea_ng, 因此终于DEA下载到的droplet存放于DEA组件所在的宿主机上。

    promise_container:

            该环节的工作主要完毕创建一个warden container,随后能够为应用的执行提供一个合适的环境。promise_container的源代码实现例如以下:

    def promise_container
          Promise.new do |p|
            bind_mounts = [{'src_path' => droplet.droplet_dirname, 'dst_path' => droplet.droplet_dirname}]
            with_network = true
            container.create_container(
              bind_mounts: bind_mounts + config['bind_mounts'],
              limit_cpu: config['instance']['cpu_limit_shares'],
              byte: disk_limit_in_bytes,
              inode: config.instance_disk_inode_limit,
              limit_memory: memory_limit_in_bytes,
              setup_network: with_network)
            attributes['warden_handle'] = container.handle
            promise_setup_def create_container(params)
        [:bind_mounts, :limit_cpu, :byte, :inode, :limit_memory, :setup_network].each do |param|
          raise ArgumentError, "expecting #{param.to_s} parameter to create container" if params[param].nil?
        end
    
        with_em do
          new_container_with_bind_mounts(params[:bind_mounts])
          limit_cpu(params[:limit_cpu])
          limit_disk(byte: params[:byte], inode: params[:inode])
          limit_memory(params[:limit_memory])
          setup_network if params[:setup_network]
        end
      endenvironment.resolve
            p.deliver
          end
        end


            能够看到传入的參数主要有:
           bind_mounts:完毕宿主机文件文件夹的路径mount到container内部。
           limit_cpu:用于限制container的CPU资源分配;
           byte:磁盘限额;
           innode:磁盘的innode的限制
           limit_memory:内存限额;
           setup_network:网络的配置项。

           当中setup_network一直设置为true。


           在container.create_container的方法实现中,有下面的方法,例如以下:

    def create_container(params)
        [:bind_mounts, :limit_cpu, :byte, :inode, :limit_memory, :setup_network].each do |param|
          raise ArgumentError, "expecting #{param.to_s} parameter to createdef create_container(params)
        [:bind_mounts, :limit_cpu, :byte, :inode, :limit_memory, :setup_network].each do |param|
          raise ArgumentError, "expecting #{param.to_s} parameter to create container" if params[param].nil?
        end
    
        with_em do
          new_container_with_bind_mounts(params[:bind_mounts])
          limit_cpu(params[:limit_cpu])
          limit_disk(byte: params[:byte], inode: params[:inode])
          limit_memory(params[:limit_memory])
          setup_network if params[:setup_network]
        end
      end container" if params[param].nil?
        end
    
        with_em do
          new_container_with_bind_mounts(params[:bind_mounts])
          limit_cpu(params[:limit_cpu])
          limit_disk(byte: params[:byte], inode: params[:inode])
          limit_memory(params[:limit_memory])
          setup_network if params[:setup_network]
        end
      end

           主要关注一下setup_network方法。例如以下:

    def setup_network
        request = ::Warden::Protocol::NetInRequest.new(handle: handle)
        response = call(:app, request)
        network_ports['host_port'] = response.host_port
        network_ports['container_port'] = response.container_port
    
        request = ::Warden::Protocol::NetInRequest.new(handle: handle)
        response = call(:app, request)
        network_ports['console_host_port'] = response.host_port
        network_ports['console_container_port'] = response.container_port
      end

           从代码中能够看到,在setup_network中。主要完毕了两次NetIn操作。对于NetIn操作,须要说明的是。完毕的工作是将host主机上的端口映射到container内部的端口。

    换言之,将host_ip:port1映射到container_ip:port2。也就是说假设container在container_ip上监听的是端口port2,则host机器外部的请求訪问host机器,而且端口为port1的时候。host的内核网络栈。会将请求转发给container的port2端口,当中使用的协议为DNAT协议。


           因此。在以上的代码中实现了两次NetIn操作,也就是说将container的两个port映射到了host宿主机,第一个port用于container内应用的正常占用port。第二个port是用来为应用的console功能做服务。尽管container也分配了第二个port,可是在而后的应用启动等中,该console_port都没有使用过,可见Cloud Foundry在这里仅仅是预留了接口,可是没有真正利用起来。


           以上主要描写叙述了NetIn的功能,下面进入NetIn操作的源代码实现。NetIn的源代码实现,主要为warden server的部分。当中。是由DEA进程通过warden.sock和warden server建立通信,随后DEA进程发送NetIn请求给warden server,warden server终于处理该请求。

           如今进入warden范畴,研究warden怎样接收请求,并实现port的映射。
    在warden/lib/warden/server.rb中,大部分代码都是为了完毕warden server的执行。在run!方法中,能够看到warden server另外还启动了一个unix domain server,代码例如以下:

    server = ::EM.start_unix_domain_server(unix_domain_path, ClientConnection)

           也就是说,warden server会通过整个unix domain server接收从DEA进程发送来的关于warden container的一系列请求,在ClientConnection类中定义,关于这部分请求怎样处理的方法。


           当unix domain server中ClientConnection类通过receive_data(data)方法来实现接收请求。代码例如以下:

          def receive_data(data)
            @buffer << data
            @buffer.each_request do |request|
              begin
                receive_request(request)
              rescue => e
                close_connection_after_writing
                logger.warn("Disconnected client after error")
                logger.log_exception(e)
              end
            end
          end

            从代码中能够看到,当buffer中有请求的时候,通过receive_request(request)方法来进一步提取请求。再进入receive_request(request)方法中。能够看到通过process(request)来处理请求。

            接着进入真正请求处理的部分,也就时process(request)的实现:

    def process(request)
            case request
            when Protocol::PingRequest
              response = request.create_response
              send_response(response)
    
            when Protocol::ListRequest
              response = request.create_response
              response.handles = Server.container_klass.registry.keys.map(&:to_s)
              send_response(response)
    
            when Protocol::EchoRequest
              response = request.create_response
              response.message = request.message
              send_response(response)
    
            when Protocol::CreateRequest
              container = Server.container_klass.new
              container.register_connection(self)
              response = container.dispatch(request)
              send_response(response)
    
            else
              if request.respond_to?(:handle)
                container = find_container(request.handle)
                process_container_request(request, container)
              else
                raise WardenError.new("Unknown request: #{request.class.name.split("::").last}")
              end
            end
          rescue WardenError => e
            send_error(e)
          rescue => e
            logger.log_exception(e)
            send_error(e)
          end

            可见。在warden server中。请求类型能够简单分为5种:PingRequest, ListRequest, EchoRequest, CreateRequest和其它请求,像NetIn请求则属于其它请求中的一种,程序运行进入case语句块的else分支,也就是:

    	if request.respond_to?(:handle)
                container = find_container(request.handle)
                process_container_request(request, container)
              else
                raise WardenError.new("Unknown request: #{request.class.name.split("::").last}")
              end
    

            

            代码清晰可见,warden server首先通过handle找到详细是给哪一个warden container发送请求,然后调用process_container_request(request, container)方法。进入process_container_request(request, container)方法能够看到:增加请求类型不为StopRequest以及StreamRequest,则进入case语句块的else分支。运行代码:

     	response = container.dispatch(request)
             send_response(response)

           能够看到。是调用了container.dispatch(request)方法才返回了response。

           下面进入warden/lib/warden/container/base.rb文件里,该文件的dispatch方法主要实现了warden server接收到请求并预处理之后,怎样分发运行详细的请求,代码例如以下:

    def dispatch(request, &blk)
            klass_name = request.class.name.split("::").last
            klass_name = klass_name.gsub(/Request$/, "")
            klass_name = klass_name.gsub(/(.)([A-Z])/) { |m| "#{m[0]}_#{m[1]}" }
            klass_name = klass_name.downcase
    
            response = request.create_response
    
            t1 = Time.now
    
            before_method = "before_%s" % klass_name
            hook(before_method, request, response)
            emit(before_method.to_sym)
    
            around_method = "around_%s" % klass_name
            hook(around_method, request, response) do
              do_method = "do_%s" % klass_name
              send(do_method, request, response, &blk)
            end
    
            after_method = "after_%s" % klass_name
            emit(after_method.to_sym)
            hook(after_method, request, response)
    
            t2 = Time.ndef dispatch(request, &blk)
            klass_name = request.class.name.split("::").last
            klass_name = klass_name.gsub(/Request$/, "")
            klass_name = klass_name.gsub(/(.)([A-Z])/) { |m| "#{m[0]}_#{m[1]}" }
            klass_name = klass_name.downcase
    
            response = request.create_response
    
            t1 = Time.now
    
            before_method = "before_%s" % klass_name
            hook(before_method, request, response)
            emit(before_method.to_sym)
    
            around_method = "around_%s" % klass_name
            hook(around_method, request, response) do
              do_method = "do_%s" % klass_name
              send(do_method, request, response, &blk)
            end
    
            after_method = "after_%s" % klass_name
            emit(after_method.to_sym)
            hook(after_method, request, response)
    
            t2 = Time.now
    
            logger.info("%s (took %.6f)" % [klass_name, t2 - t1],
                        :request => request.to_hash,
                        :response => response.to_hash)
    
            response
          endow
    
            logger.info("%s (took %.6f)" % [klass_name, t2 - t1],
                        :request => request.to_hash,
                        :response => response.to_hash)
    
            response
          end

           首先提取出请求的类型名,假设是NetIn请求的话,提取出来的请求的类型名称为net_in,随后构建出方法明do_method。也就是do_net_in。接着就通过send(do_method, request, response, &blk)方法,将參数发送给do_net_in方法。

           如今就是进入warden/lib/warden/container/features/net.rb文件里, do_net_in方法实现例如以下:
     

    def do_net_in(request, response)
              if request.host_port.nil?
                host_port = self.class.port_pool.acquire
    
                # Use same port on the container side as the host side if unspecified
                container_port = request.container_port || host_port
    
                # Port may be re-used after this container has been destroyed
                @resources["ports"] << host_port
                @acquired["ports"] << host_port
              else
                host_port = request.host_port
                container_port = request.container_port || host_port
              end
    
              _net_in(host_port, container_port)
    
              @resources["net_in"] ||= []
              @resources["net_in"] << [host_port, container_port]
    
              response.host_port      = host_port
              response.container_port = container_port
            rescue WardenError
              self.class.port_pool.release(host_port) unless request.host_port
              raise
            end


           可见,假设请求port没有指定的话,那么就使用代码host_port = self.class.port_pool.acquire来获取port号,默认情况下,containerport号与hostport号保持一致,有了这两个port号之后,运行代码_net_in(host_port, container_port)。 真正实现port映射,例如以下:

    def _net_in(host_port, container_port)
              sh File.join(container_path, "net.sh"), "in", :env => {
                "HOST_PORT"      => host_port,
                "CONTAINER_PORT" => container_port,
              }
            end

           能够清晰的看到是。使用了容器内部的net.sh脚本来实现port映射。如今进入warden/root/linux/skeleton/net.sh脚本。进入參数为in的运行部分:

    "in")
        if [ -z "${HOST_PORT:-}" ]; then
          echo "Please specify HOST_PORT..." 1>&2
          exit 1
        fi
        if [ -z "${CONTAINER_PORT:-}" ]; then
          echo "Please specify CONTAINER_PORT..." 1>&2
          exit 1
        fi
        iptables -t nat -A ${nat_instance_chain} 
          --protocol tcp 
          --destination "${external_ip}" 
          --destination-port "${HOST_PORT}" 
          --jump DNAT 
          --to-destination "${network_container_ip}:${CONTAINER_PORT}"
        ;;

            可见,该脚本这部分的功能是在host主机创建一条DNAT规则。使得host主机上全部HOST_PORTport上的网络请求。都转发至network_container_ip:CONTAINER_PORT上。也就是完毕了目标地址IP转变。

    promise_extract_droplet:

            该环节主要完毕的是让container执行脚本。使得container容器将位于host主机的droplet文件,解压至container内部,代码内容例如以下:

    def promise_extract_droplet
          Promise.new do |p|
            script = "cd /home/vcap/ && tar zxf #{droplet.droplet_path}"
            container.run_script(:app, script)
            p.deliver
          end
        end

    promise_exec_hook_script('before_start'):

            这部分主要完毕的功能是让容器执行名为before_start的脚本。在老的版本号中,该部分的设置默觉得空。


    promise_start:

            这部分主要完毕的是应用的启动。

    当中创建容器的时候。返回的port号,会被DEA保存。终于。当DEA启动应用的时候,因为DEA会将port号作为參数传递给应用程序的启动脚本。因此当应用启动时会自己主动去监听已经为它设置的port,即完毕port监听。

    关于作者:

    孙宏亮,DAOCLOUD软件project师。两年来在云计算方面主要研究PaaS领域的相关知识与技术。

    坚信轻量级虚拟化容器的技术,会给PaaS领域带来深度影响,甚至决定未来PaaS技术的走向。


    转载请注明出处。

    本文很多其它出于我本人的理解。肯定在一些地方存在不足和错误。

    希望本文可以对接触wardenport映射的人有些帮助,假设你对这方面感兴趣,并有更好的想法和建议。也请联系我。

    我的邮箱:allen.sun@daocloud.io
    新浪微博:@莲子弗如清

  • 相关阅读:
    Codeforces 1045C Hyperspace Highways (看题解) 圆方树
    Codeforces 316E3 线段树 + 斐波那切数列 (看题解)
    Codeforces 803G Periodic RMQ Problem 线段树
    Codeforces 420D Cup Trick 平衡树
    Codeforces 295E Yaroslav and Points 线段树
    Codeforces 196E Opening Portals MST (看题解)
    Codeforces 653F Paper task SA
    Codeforces 542A Place Your Ad Here
    python基础 异常与返回
    mongodb 删除
  • 原文地址:https://www.cnblogs.com/gccbuaa/p/6893731.html
Copyright © 2011-2022 走看看