zoukankan      html  css  js  c++  java
  • heat 更新stack的代码调用分析heat stack-update


    heat中与nova中有个地方不太相同,在nova中,RPC发送后是由nova-manager来接收,对应的文件在nova/compute/manager.py;在heat中,RPC发送后是由 heat-engine来接收,对应的文件在heat/engine/service.py.
    Heat/api/openstack/v1/stacks.py
        @util.identified_stack
        def update(self, req, identity, body):
            """Update an existing stack with a new template and/or parameters."""
            data = InstantiationData(body)  
    
            args = self.prepare_args(data)  
            self.rpc_client.update_stack(   
                req.context,       
                identity,          
                data.template(),   
                data.environment(),
                data.files(),      
                args,
                environment_files=data.environment_files())
    
            raise exc.HTTPAccepted()
    
    Heat/rpc/client.py
        def update_stack(self, ctxt, stack_identity, template, params,
                         files, args, environment_files=None):
            """Updates an existing stack based on the provided template and params.
    
            Note that at this stage the template has already been fetched from the
            heat-api process if using a template-url.
    
            :param ctxt: RPC context.
            :param stack_name: Name of the stack you want to create.
            :param template: Template of stack you want to create.
            :param params: Stack Input Params/Environment
            :param files: files referenced from the environment.
            :param args: Request parameters/args passed from API
            :param environment_files: optional ordered list of environment file
                   names included in the files dict
            :type  environment_files: list or None
            """
            return self.call(ctxt, 
                             self.make_msg('update_stack',   
                                           stack_identity=stack_identity,  
                                           template=template,              
                                           params=params,                  
                                           files=files,                    
                                           environment_files=environment_files,
                                           args=args),                     
                             version='1.23')  
    Heat/enigne/service.py
    主要看如下代码的调用,从回调可以知道,后面将会执行current_stack.update这个函数,实
    际上他就是stack.update(heat/engine/stack.py)
                th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
                                                           self.engine_id,
                                                           current_stack.update,
                                                           updated_stack,
                                                           event=event)
        @context.request_context
        def update_stack(self, cnxt, stack_identity, template, params,
                         files, args, environment_files=None):
            """Update an existing stack based on the provided template and params.
    
            Note that at this stage the template has already been fetched from the
            heat-api process if using a template-url.
    
            :param cnxt: RPC context.
            :param stack_identity: Name of the stack you want to create.
            :param template: Template of stack you want to create.
            :param params: Stack Input Params
            :param files: Files referenced from the template
            :param args: Request parameters/args passed from API
            :param environment_files: optional ordered list of environment file
                   names included in the files dict
            :type  environment_files: list or None
            """
    		(省略掉部分代码,来看关键部分)
            if current_stack.convergence:   
                current_stack.thread_group_mgr = self.thread_group_mgr
                current_stack.converge_stack(template=tmpl,
                                             new_stack=updated_stack)
            else:
                event = eventlet.event.Event()
                th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
                                                           self.engine_id,
                                                           current_stack.update,
                                                           updated_stack,
                                                           event=event)
    Heat/engine/stack.py
    同调用heat stack-create,这里也是创建了一个TaskRunner的对象,然后调用self.update_task(这个函数是在
    Heat/engine/stack.py中定义,可以回顾一下heat stack-create中的stack_task()这个函数。)
        @profiler.trace('Stack.update', hide_args=False)
        @reset_state_on_error      
        def update(self, newstack, event=None):
            """Update the stack.   
    
            Compare the current stack with newstack,
            and where necessary create/update/delete the resources until
            this stack aligns with newstack.
    
            Note update of existing stack resources depends on update
            being implemented in the underlying resource types
    
            Update will fail if it exceeds the specified timeout. The default is
            60 minutes, set in the constructor
            """
            LOG.debug("Jeffrey: stack.update")
            self.updated_time = oslo_timeutils.utcnow()
            updater = scheduler.TaskRunner(self.update_task, newstack,
                                           event=event)                    
            updater()
    接下来我们看一下self.update_task(),同样我们省略部分代码只看关键代码。
    先是声明一个StackUpdata的对象update_task,然后这个对象被TaskRunner()调用。
    Heat/engine/stack.py
       @scheduler.wrappertask
        def update_task(self, newstack, action=UPDATE, event=None):
            try:
    
                update_task = update.StackUpdate(
                    self, newstack, backup_stack,
                    rollback=action == self.ROLLBACK,
                    error_wait_time=cfg.CONF.error_wait_time)
                updater = scheduler.TaskRunner(update_task)
    
                try:
                    updater.start(timeout=self.timeout_secs())
                    yield
                    while not updater.step():
                        if event is None or not event.ready():
                            yield
                        else:
                            message = event.wait()
                            self._message_parser(message)
    在scheduler.TaskRunner初始化的时候,self.task的值被赋为update.StackUpdate()类型的变量
    
    接下来去heat/engine/scheduler.py中看一下start()方法的实现, 我们可以发现,heat engine通过 start()->step()->run_to_completion()这三个函数实现了循环,然后借助yield 及装饰器wrappertask()实现了迭代。
    
        def start(self, timeout=None):  
            """Initialise the task and run its first step.
    
            If a timeout is specified, any attempt to step the task after that
            number of seconds has elapsed will result in a Timeout being
            raised inside the task.
            """
            assert self._runner is None, "Task already started" 
            assert not self._done, "Task already cancelled"
            LOG.debug("Jeffrey: scheduler.TaskRunner.start")
    
            LOG.debug('%s starting' % six.text_type(self))
    
            if timeout is not None:
                self._timeout = Timeout(self, timeout)
    
            result = self._task(*self._args, **self._kwargs)
            if isinstance(result, types.GeneratorType):
    		   #注意此处的代码,调用step ,把subtask进行分解成一个个的step进行执行。
                self._runner = result           
                self.step()        
            else:
                self._runner = False            
                self._done = True  
                LOG.debug('%s done (not resumable)' % six.text_type(self))
    
        def step(self):            
            """Run another step of the task.
    
            Return True if the task is complete; False otherwise.
            """
            LOG.debug("Jeffrey: scheduler.TaskRunner.step: self.done()=%s" % self.done())
            if not self.done():
                assert self._runner is not None, "Task not started"
    
                if self._timeout is not None and self._timeout.expired():
                    LOG.info(_LI('%s timed out'), six.text_type(self))
                    self._done = True
    
                    self._timeout.trigger(self._runner)
                else:
                    LOG.debug('%s running' % six.text_type(self))
    
                    try:
    				#此处执行self._runner这个对象的__call__方法,也就是update.StackUpdate的__call__()。它定义在:
    				#heat/engine/update.py
                        LOG.debug("Jeffrey: scheduler.TaskRunner.step: next(self._runner)=%s" % next(self._runner))
                        next(self._runner)
                    except StopIteration:
                        self._done = True
                        LOG.debug('%s complete' % six.text_type(self))
    
            return self._done
    
        def run_to_completion(self, wait_time=1):
            """Run the task to completion.
    
            The task will sleep for `wait_time` seconds between steps. To avoid
            sleeping, pass `None` for `wait_time`.
            """
            while not self.step():
                self._sleep(wait_time)
    
    在step中的我们看到了next(self._runner)这样的代码,其执行是的是StackUpdater中的__call__().
    注意cleanup_prev和self.updater这两个DependencyTaskGroup类型的对象。前者的回调函数是
    Self._remove_backup_resource,后者的回调是self._resource_update.
    Heat/engine/update.py
        @scheduler.wrappertask     
        def __call__(self):        
            """Return a co-routine that updates the stack."""
    
            cleanup_prev = scheduler.DependencyTaskGroup(
                self.previous_stack.dependencies,
                self._remove_backup_resource,   
                reverse=True)      
    
            self.updater = scheduler.DependencyTaskGroup(
                self.dependencies(),            
                self._resource_update,          
                error_wait_time=self.error_wait_time)
    
            if not self.rollback:  
                LOG.debug("Jeffrey in stack.StackUpdate: run cleanup_prev()")
                yield cleanup_prev()        
    
            try:
                LOG.debug("Jeffrey in stack.StackUpdate: run updater()")
                yield self.updater()        
            finally:               
                self.previous_stack.reset_dependencies()
    
    找到回调函数后,我们的继续看self._resource_update这个函数,可以发现对于已存在的资源和没有存在的资源的
    更新,heat分别调用的是不同的接口来实现的。
    Heat/engine/update.py
        def _resource_update(self, res):    
            if res.name in self.new_stack and self.new_stack[res.name] is res:
                return self._process_new_resource_update(res)
            else:                  
                return self._process_existing_resource_update(res)
    
    
    后面的就不再接着分析了…………


  • 相关阅读:
    forward和redirect的区别详解
    j2ee部分jar包的作用
    Struts2的工作原理(图解)详解
    struts2的s:iterator 标签 详解
    Struts2 控制标签:<s:if>、<s:elseif>和<s:else>
    Struts2 资源配置文件国际化详解
    ActionContext和ServletActionContext区别以及action访问servlet API的三种方法
    js获取class
    怎么解决eclipse报PermGen space异常的问题
    SQL模糊查找语句详解
  • 原文地址:https://www.cnblogs.com/double12gzh/p/10166117.html
Copyright © 2011-2022 走看看