zoukankan      html  css  js  c++  java
  • 通过heat创建stack的代码流程分析heat stack-create

    Heat-api发送RPC请求
    Heat/api/openstack/v1/stacks.py
        @util.policy_enforce                                                                                                                                                                               
        def create(self, req, body):                                                                                                                                                                       
            """Create a new stack."""                                                                                                                                                                      
            data = InstantiationData(body)                                                                                                                                                                 
                                                                                                                                                                                                           
            args = self.prepare_args(data)                                                                                                                                                                 
            result = self.rpc_client.create_stack(                                                                                                                                                         
                req.context,                                                                                                                                                                               
                data.stack_name(),                                                                                                                                                                         
                data.template(),                                                                                                                                                                           
                data.environment(),                                                                                                                                                                        
                data.files(),                                                                                                                                                                              
                args,                                                                                                                                                                                      
                environment_files=data.environment_files())                                                                                                                                                
                                                                                                                                                                                                           
            formatted_stack = stacks_view.format_stack(                                                                                                                                                    
                req,                                                                                                                                                                                       
                {rpc_api.STACK_ID: result}                                                                                                                                                                 
            )                                                                                                                                                                                              
            return {'stack': formatted_stack}
    
    Heat/rpc/client.py
        def create_stack(self, ctxt, stack_name, template, params, files,                                                                                                                                  
                         args, environment_files=None):                                                                                                                                                    
            """Creates a new stack using the template provided.                                                                                                                                            
                                                                                                                                                                                                           
            Note that at this stage the template has already been fetched from the                                                                                                                         
            heat-api process if using a template-url.                                                                                                                                                      
                                                                                                                                                                                                           
            :param ctxt: RPC context.                                                                                                                                                                      
            :param stack_name: Name of the stack you want to create.                                                                                                                                       
            :param template: Template of stack you want to create.                                                                                                                                         
            :param params: Stack Input Params/Environment                                                                                                                                                  
            :param files: files referenced from the environment.                                                                                                                                           
            :param args: Request parameters/args passed from API                                                                                                                                           
            :param environment_files: optional ordered list of environment file                                                                                                                            
                   names included in the files dict                                                                                                                                                        
            :type  environment_files: list or None                                                                                                                                                         
            """                                                                                                                                                                                            
            return self._create_stack(ctxt, stack_name, template, params, files,                                                                                                                           
                                      args, environment_files=environment_files) 
    
        def _create_stack(self, ctxt, stack_name, template, params, files,                                                                                                                                 
                          args, environment_files=None,                                                                                                                                                    
                          owner_id=None, nested_depth=0, user_creds_id=None,                                                                                                                               
                          stack_user_project_id=None, parent_resource_name=None):                                                                                                                          
            """Internal interface for engine-to-engine communication via RPC.                                                                                                                              
                                                                                                                                                                                                           
            Allows some additional options which should not be exposed to users via                                                                                                                        
            the API:                                                                                                                                                                                       
                                                                                                                                                                                                           
            :param owner_id: parent stack ID for nested stacks                                                                                                                                             
            :param nested_depth: nested depth for nested stacks                                                                                                                                            
            :param user_creds_id: user_creds record for nested stack
            :param stack_user_project_id: stack user project for nested stack
            :param parent_resource_name: the parent resource name
            """
            return self.call(
                ctxt, self.make_msg('create_stack', stack_name=stack_name,
                                    template=template,
                                    params=params, files=files,
                                    environment_files=environment_files,
                                    args=args, owner_id=owner_id,
                                    nested_depth=nested_depth,
                                    user_creds_id=user_creds_id,
                                    stack_user_project_id=stack_user_project_id,
                                    parent_resource_name=parent_resource_name),
                version='1.23')
    
    最终RPC请求由heat-engine接收,真正有来创建stack的操作是由stack.create()来完成的。
    Heat/engine/service.py
        def create_stack(self, cnxt, stack_name, template, params, files,
                         args, environment_files=None,
                         owner_id=None, nested_depth=0, user_creds_id=None,
                         stack_user_project_id=None, parent_resource_name=None):
            """Create a new stack using the template provided.
    
            Note that at this stage the template has already been fetched from the
            heat-api process if using a template-url.
    
            :param cnxt: RPC context.
            :param stack_name: Name of the stack you want to create.
            :param template: Template of stack you want to create.
            :param params: Stack Input Params
            :param files: Files referenced from the template
            :param args: Request parameters/args passed from API
            :param environment_files: optional ordered list of environment file
                   names included in the files dict
            :type  environment_files: list or None
            :param owner_id: parent stack ID for nested stacks, only expected when
                             called from another heat-engine (not a user option)
            :param nested_depth: the nested depth for nested stacks, only expected
                             when called from another heat-engine
            :param user_creds_id: the parent user_creds record for nested stacks
            :param stack_user_project_id: the parent stack_user_project_id for
                             nested stacks
            :param parent_resource_name: the parent resource name
            """
            LOG.info(_LI('Creating stack %s'), stack_name)
    
    	      # 创建一个新的porject,之所以会新建一个独立的project,
    	   #而不是直接用发起创建stack操作的project
    	   #应该是从权限控制角度考虑,通过新的project只能访问相关software_config资源.
    	   #通过分析stack.create_stack_user_project_id,其实际上是就是调用了keystone 
    	   #client的相关接口创建了一个新project.
            def _create_stack_user(stack):
                if not stack.stack_user_project_id:
                    try:
                        stack.create_stack_user_project_id()
                    except exception.AuthorizationFailure as ex:
                        stack.state_set(stack.action, stack.FAILED,
                                        six.text_type(ex))
    
            def _stack_create(stack):
                # Create/Adopt a stack, and create the periodic task if successful
                if stack.adopt_stack_data:
                    stack.adopt()
                elif stack.status != stack.FAILED:
                                        # 真正做事情的地方,些处调用的是heat/engine/stack.py中的create函数。
                    stack.create() 
    
                if (stack.action in (stack.CREATE, stack.ADOPT)
                        and stack.status == stack.COMPLETE):
                    if self.stack_watch:
                        # Schedule a periodic watcher task for this stack
                        self.stack_watch.start_watch_task(stack.id, cnxt)
                else:
                    LOG.info(_LI("Stack create failed, status %s"), stack.status)
    
            convergence = cfg.CONF.convergence_engine
    
    	       # 对heat 模板进行验证。
            stack = self._parse_template_and_validate_stack(
                cnxt, stack_name, template, params, files, environment_files,
                args, owner_id, nested_depth, user_creds_id,
                stack_user_project_id, convergence, parent_resource_name)
    
            self.resource_enforcer.enforce_stack(stack)
    	      #把stack的信息写入数据库。
            stack_id = stack.store()
            if cfg.CONF.reauthentication_auth_method == 'trusts':
                stack = parser.Stack.load(
                    cnxt, stack_id=stack_id, use_stored_context=True)
    	       # 创建一个新的user.
            _create_stack_user(stack)
            if convergence:
                action = stack.CREATE
                if stack.adopt_stack_data:
                    action = stack.ADOPT
                stack.thread_group_mgr = self.thread_group_mgr
                stack.converge_stack(template=stack.t, action=action)
            else:
                self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
                                                      _stack_create, stack)
    
            return dict(stack.identifier())
    
    接下来看一下stack.create() heat/engine/stack.py
    注意其中的TaskRunner这个类及其回调函数self.stack_task。
    在heat中,TaskRunner被广泛使用,用创建诸如create, update, suspend等的操作。
    TaskRunner的构造参数中定义了,这个Task将执行的函数,对stack所涉及的资源执行的action,即创建。
    最后调度这个Task运行,运行的过程是执行self.stack_task函数
        @profiler.trace('Stack.create', hide_args=False)
        @reset_state_on_error      
        def create(self):
            """Create the stack and all of the resources."""
            def rollback():
                if not self.disable_rollback and self.state == (self.CREATE, 
                                                                self.FAILED):                   
                    self.delete(action=self.ROLLBACK)
    
            self._store_resources()
    
    	      # 因为我们是调用heat stack-create,所以这里是self.stack_task;如果我们调用的是
    	      # heat stack-update,那么此处应该是self.update_task.
    	     # 对于stack的其它操作,如suspend,resume,snapshot等是self.stack_task.具体的可以
    	     #看一下heat/engine/stack.py中的代码。
            creator = scheduler.TaskRunner( 
                self.stack_task, action=self.CREATE,
                reverse=False, post_func=rollback,
                error_wait_time=cfg.CONF.error_wait_time)
    
    	       #此处将调用TaskRunner中的__call__()方法。在此__call__()方法中,self.stack_task
    	       # 将会被执行。
            creator(timeout=self.timeout_secs())
    
    TaskRunner这个类定义在了heat/engine/scheduler.py中。
    Heat engine中比较重要的几个文件就是scheduler.py, service.py, stack.py, resource,py, stack_lock.py, update.py
    	Scheduler.py中的两个类比较重要:
    		TaskRunner: 对stack的任何操作, 最开始都是以TaskRunner来封装的。
    		DependencyTaskGroup: 一个Task中可能会有许多步骤(step)来完,实现task的循环执行主要依靠这个类
    			与wrappertask这个装饰器及配合yield来共同完成的。
    	Service.py中主要定义了三个类:
    		EngineService:
    		EngineListenerService:
    		ThreadGroupManager: 实现把对stack的操作放到子线程中执行。
    		
    这个stack_task有三个重要的点:
    scheduler.wrappertask进行了装饰,意味这task需要处理subtask
    scheduler.DependencyTaskGroup,构造该对象,即拓扑结构的图,该图上的每个节点都是一个资源,即资源上执行的task,即stack_task的子任务
    yield action_task(),这是yield关键字的使用,有yield之后,函数就成为一个对象了,那么直接调用stack_task不会执行该函数,而是返回一个迭代器对象
    
    下面来看stack_task(), 同时注意所用的装饰器。
       @scheduler.wrappertask
        def stack_task(self, action, reverse=False, post_func=None,
                       error_wait_time=None,
                       aggregate_exceptions=False, pre_completion_func=None):
            """A task to perform an action on the stack.
    
            All of the resources are traversed in forward or reverse dependency
            order.
    
            :param action: action that should be executed with stack resources
            :param reverse: define if action on the resources need to be executed
             in reverse order (resources - first and then res dependencies )
            :param post_func: function that need to be executed after
            action complete on the stack
            :param error_wait_time: time to wait before cancelling all execution
            threads when an error occurred
            :param aggregate_exceptions: define if exceptions should be aggregated
            :param pre_completion_func: function that need to be executed right
            before action completion. Uses stack ,action, status and reason as
            input parameters
            """
            LOG.debug("Jeffrey: stack.stack_task")
            try:
                lifecycle_plugin_utils.do_pre_ops(self.context, self,
                                                  None, action)
            except Exception as e:
                self.state_set(action, self.FAILED, e.args[0] if e.args else
                               'Failed stack pre-ops: %s' % six.text_type(e))
                if callable(post_func):
                    post_func()
                return
            self.state_set(action, self.IN_PROGRESS,
                           'Stack %s started' % action)
    
            stack_status = self.COMPLETE
            reason = 'Stack %s completed successfully' % action
    
            action_method = action.lower()
            # If a local _$action_kwargs function exists, call it to get the
            # action specific argument list, otherwise an empty arg list
            handle_kwargs = getattr(self,
                                    '_%s_kwargs' % action_method,
                                    lambda x: {})
    
            @functools.wraps(getattr(resource.Resource, action_method))
            def resource_action(r):
                # Find e.g resource.create and call it
                handle = getattr(r, action_method)
                LOG.debug("Jeffrey: stack.stack_task.resource_action: handle=%s" % handle)
    		     #此处实际上就是调用了资源对应的handler函数,比如handle_create()
    		    # 这些handle_Xxaction()函数在定义资源的时候已经实现,具体实例可以参看:
    		    # heat/engine/resources/openstack/nova/server.py
                return handle(**handle_kwargs(r))
    
            action_task = scheduler.DependencyTaskGroup(
                self.dependencies,
                resource_action,
                reverse,
                error_wait_time=error_wait_time,
                aggregate_exceptions=aggregate_exceptions)
    
            try:
                LOG.debug("Jeffrey: stack.stack_task.resource_action: action_task 1")
                               # 调用了DependcyTaskGroup的__call__(),在__call__中又调用了回调函数
    	                # resource_action
                yield action_task()
                LOG.debug("Jeffrey: stack.stack_task.resource_action: action_task 1")
    
    继续看scheduler.py中的__call__()
        def __call__(self):
            """Return a co-routine which runs the task group."""
            LOG.debug("Jeffrey: DependencyTaskGroup.__call__ : self._ready()=%s" % self._ready())
            raised_exceptions = [] 
            while any(six.itervalues(self._runners)):
                try:
                    for k, r in self._ready():      
                        LOG.debug("Jeffrey: DependencyTaskGroup.__call__.r=%s" % r)
                        r.start()  
    
    上面的r.start调用的是scheduler.py中的start():
    这个函数主要是执行传递进来的Task,result = self._task(*self._args, **self._kwargs)不一定是去执行这个函数,就因为
    yield关键字的存在导致其成为一个Generator对象了,所以下 一步有个类型判断,对于GeneratorType意味着需要一步步执行,
    调用step函数。若没有yield关键字,一切正常如以往。step函数则调用与yield配合的next函数,最后在run_to_completion函数中,
    循环的调用next函数,直至yield抛出StopIteration异常.注意,此处的self._task实际上是在heat/engine/stack.py中定义的stack_task()
    
        def start(self, timeout=None):
            """Initialise the task and run its first step.
    
            If a timeout is specified, any attempt to step the task after that
            number of seconds has elapsed will result in a Timeout being
            raised inside the task.
            """
            assert self._runner is None, "Task already started"
            assert not self._done, "Task already cancelled"
            LOG.debug("Jeffrey: scheduler.TaskRunner.start")
    
            LOG.debug('%s starting' % six.text_type(self))
    
            if timeout is not None:
                self._timeout = Timeout(self, timeout)
    
            result = self._task(*self._args, **self._kwargs)
            if isinstance(result, types.GeneratorType):
                self._runner = result
                self.step()
            else:
                self._runner = False
                self._done = True
                LOG.debug('%s done (not resumable)' % six.text_type(self))
    
        def step(self):
            """Run another step of the task.
    
            Return True if the task is complete; False otherwise.
            """
            LOG.debug("Jeffrey: scheduler.TaskRunner.step: self.done()=%s" % self.done())
            if not self.done():
                assert self._runner is not None, "Task not started"
    
                if self._timeout is not None and self._timeout.expired():
                    LOG.info(_LI('%s timed out'), six.text_type(self))
                    self._done = True
    
                    self._timeout.trigger(self._runner)
                else:
                    LOG.debug('%s running' % six.text_type(self))
    
                    try:
                        LOG.debug("Jeffrey: scheduler.TaskRunner.step: next(self._runner)=%s" % next(self._runner))
                        next(self._runner)
                    except StopIteration:
                        self._done = True
                        LOG.debug('%s complete' % six.text_type(self))
    
            return self._done
    
        def run_to_completion(self, wait_time=1):
            """Run the task to completion.
    
            The task will sleep for `wait_time` seconds between steps. To avoid
            sleeping, pass `None` for `wait_time`.
            """
            while not self.step():
                self._sleep(wait_time)
    
    
    


  • 相关阅读:
    「UVA12293」 Box Game
    「CF803C」 Maximal GCD
    「CF525D」Arthur and Walls
    「CF442C」 Artem and Array
    LeetCode lcci 16.03 交点
    LeetCode 1305 两棵二叉搜索树中的所有元素
    LeetCode 1040 移动石子直到连续 II
    LeetCode 664 奇怪的打印机
    iOS UIPageViewController系统方法崩溃修复
    LeetCode 334 递增的三元子序列
  • 原文地址:https://www.cnblogs.com/double12gzh/p/10166118.html
Copyright © 2011-2022 走看看