zoukankan      html  css  js  c++  java
  • ansible 2.5--2.7API的Callback回调源码剖析

    重写回调方法:

    class ResultCallback(CallbackBase): #继承callbackbase,并重写其3个方法
        """A sample callback plugin used for performing an action as results come in
    
        If you want to collect all results into a single object for processing at
        the end of the execution, look into utilizing the ``json`` callback plugin
        or writing your own custom callback plugin
        """
        def v2_runner_on_ok(self, result, **kwargs): #定义成功
            """Print a json representation of the result
    
            This method could store the result in an instance attribute for retrieval later
            """
            # self.host_ok[result._host.get_name()] = result
            host = result._host
            currt_tiem = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            logs = json.dumps({currt_tiem: {host.name: result._result['msg']}}, indent=4)
            with open('/opt/aaaa.json', 'a') as f:
                f.write(logs+'
    ')
            self.result_log = result._result['msg']
    
        def v2_runner_on_unreachable(self, result, **kwargs):
            """Print a json representation of the result
    
            This method could store the result in an instance attribute for retrieval later
            """
            # self.host_ok[result._host.get_name()] = result
            host = result._host
            currt_tiem = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            self.result_log = result._result['msg']
            logs = json.dumps({currt_tiem: {host.name: self.result_log}}, indent=4)
            with open('/opt/aaaa.json', 'a') as f:
                f.write(logs + '
    ')
    
    
        def v2_runner_on_failed(self, result, **kwargs):
            """Print a json representation of the result
    
            This method could store the result in an instance attribute for retrieval later
            """
            host = result._host
            currt_tiem = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            self.result_log = result._result['msg']
            logs = json.dumps({currt_tiem: {host.name: self.result_log}}, indent=4)
            with open('/opt/aaaa.json', 'a') as f:
                f.write(logs + '
    ')

    ansible api调用主程序

     1 class Run_comm():
     2     def __init__(self,server_ip):
     3         Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])
     4         self.options = Options(connection='smart', module_path=['/usr/local/lib/python2.7/site-packages/ansible/modules'], 
     5             forks=10, become=None, become_method='su', become_user='root', check=False, diff=False)
     6 
     7         self.loader = DataLoader()
     8         self.passwords = dict(vault_pass='vagrant')
     9 
    10         self.results_callback = ResultCallback() #把回调类实例化
    11 
    12         self.inventory = InventoryManager(loader=self.loader, sources='{},'.format(server_ip))
    13 
    14         self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
    15         # self.extra_vars ={'ansible_ssh_user': 'root','ansible_ssh_pass': 'aslan-game@2017~johnly'}
    16         # self.extra_vars = {'ansible_ssh_user': 'root', 'ansible_ssh_pass': 'vagrant'}
    17         # self.extra_vars = {'ansible_ssh_user': 'root', 'ansible_ssh_pass': 'ysdon2016'}
    18         self.extra_vars = {'ansible_ssh_user': ssh_dict['user'], 'ansible_ssh_pass': ssh_dict['password']}
    19         self.variable_manager.extra_vars = self.extra_vars
    20         self.tqm = None
    21 
    22 
    23     def run_command(self,command):
    24         self.play_source = dict(
    25             name="Ansible Play",
    26             hosts='all',
    27             gather_facts='yes',
    28             tasks=[
    29                 dict(action=dict(module='shell', args=command), register='shell_out'),
    30                 dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}')))
    31             ]
    32         )
    33         self.play = Play().load(self.play_source, variable_manager=self.variable_manager, loader=self.loader)
    34         try:
    35 
    36             self.tqm = TaskQueueManager(
    37                       inventory=self.inventory,
    38                       variable_manager=self.variable_manager,
    39                       loader=self.loader,
    40                       options=self.options,
    41                       passwords=self.passwords,
    42                       stdout_callback=self.results_callback,  #把实例化的回调类传入
    43                   )
    44             result = self.tqm.run(self.play)
    45         finally:
    46 
    47             if self.tqm is not None:
    48                 self.tqm.cleanup()
    49         return self.results_callback.result_log
    TaskQueueManager中调用过程:在run中加载所有的回调类,然后把play事件设置进回调类中,并执行v2_playbook_on_play_start回调方法,表示要开始执行play操作
      1 class TaskQueueManager:
      2 
      3     '''
      4     This class handles the multiprocessing requirements of Ansible by
      5     creating a pool of worker forks, a result handler fork, and a
      6     manager object with shared datastructures/queues for coordinating
      7     work between all processes.
      8 
      9     The queue manager is responsible for loading the play strategy plugin,
     10     which dispatches the Play's tasks to hosts.
     11     '''
     12 
     13     RUN_OK = 0
     14     RUN_ERROR = 1
     15     RUN_FAILED_HOSTS = 2
     16     RUN_UNREACHABLE_HOSTS = 4
     17     RUN_FAILED_BREAK_PLAY = 8
     18     RUN_UNKNOWN_ERROR = 255
     19 
     20     def __init__(self, inventory, variable_manager, loader, options, passwords, stdout_callback=None, run_additional_callbacks=True, run_tree=False):
     21 
     22         self._inventory = inventory
     23         self._variable_manager = variable_manager
     24         self._loader = loader
     25         self._options = options
     26         self._stats = AggregateStats()
     27         self.passwords = passwords
     28         self._stdout_callback = stdout_callback #赋值
     29         self._run_additional_callbacks = run_additional_callbacks
     30         self._run_tree = run_tree
     31 
     32         self._callbacks_loaded = False
     33         self._callback_plugins = []
     34         self._start_at_done = False
     35 
     36         # make sure any module paths (if specified) are added to the module_loader
     37         if options.module_path:
     38             for path in options.module_path:
     39                 if path:
     40                     module_loader.add_directory(path)
     41 
     42         # a special flag to help us exit cleanly
     43         self._terminated = False
     44 
     45         # this dictionary is used to keep track of notified handlers
     46         self._notified_handlers = dict()
     47         self._listening_handlers = dict()
     48 
     49         # dictionaries to keep track of failed/unreachable hosts
     50         self._failed_hosts = dict()
     51         self._unreachable_hosts = dict()
     52 
     53         try:
     54             self._final_q = multiprocessing.Queue()
     55         except OSError as e:
     56             raise AnsibleError("Unable to use multiprocessing, this is normally caused by lack of access to /dev/shm: %s" % to_native(e))
     57 
     58         # A temporary file (opened pre-fork) used by connection
     59         # plugins for inter-process locking.
     60         self._connection_lockfile = tempfile.TemporaryFile()
     61     def run(self, play):
     62         '''
     63         Iterates over the roles/tasks in a play, using the given (or default)
     64         strategy for queueing tasks. The default is the linear strategy, which
     65         operates like classic Ansible by keeping all hosts in lock-step with
     66         a given task (meaning no hosts move on to the next task until all hosts
     67         are done with the current task).
     68         '''
     69 
     70         if not self._callbacks_loaded:
     71             self.load_callbacks() #在run中调用:加载回调类
     72 
     73         all_vars = self._variable_manager.get_vars(play=play)
     74         warn_if_reserved(all_vars)
     75         templar = Templar(loader=self._loader, variables=all_vars)
     76 
     77         new_play = play.copy()
     78         new_play.post_validate(templar)
     79         new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers
     80 
     81         self.hostvars = HostVars(
     82             inventory=self._inventory,
     83             variable_manager=self._variable_manager,
     84             loader=self._loader,
     85         )
     86 
     87         play_context = PlayContext(new_play, self._options, self.passwords, self._connection_lockfile.fileno())
     88         for callback_plugin in self._callback_plugins: #循环所有回调类
     89             if hasattr(callback_plugin, 'set_play_context'):
     90                 callback_plugin.set_play_context(play_context) #把要回调的play操作设置进回调类里面
     91 
     92         self.send_callback('v2_playbook_on_play_start', new_play) #执行v2_playbook_on_play_start回调方法
     93  
     94         # initialize the shared dictionary containing the notified handlers
     95         self._initialize_notified_handlers(new_play)
     96 
     97         # build the iterator
     98         iterator = PlayIterator(
     99             inventory=self._inventory,
    100             play=new_play,
    101             play_context=play_context,
    102             variable_manager=self._variable_manager,
    103             all_vars=all_vars,
    104             start_at_done=self._start_at_done,
    105         )
    106 
    107         # adjust to # of workers to configured forks or size of batch, whatever is lower
    108         self._initialize_processes(min(self._options.forks, iterator.batch_size))
    109 
    110         # load the specified strategy (or the default linear one)
    111         strategy = strategy_loader.get(new_play.strategy, self)
    112         if strategy is None:
    113             raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
    114 
    115         # Because the TQM may survive multiple play runs, we start by marking
    116         # any hosts as failed in the iterator here which may have been marked
    117         # as failed in previous runs. Then we clear the internal list of failed
    118         # hosts so we know what failed this round.
    119         for host_name in self._failed_hosts.keys():
    120             host = self._inventory.get_host(host_name)
    121             iterator.mark_host_failed(host)
    122 
    123         self.clear_failed_hosts()
    124 
    125         # during initialization, the PlayContext will clear the start_at_task
    126         # field to signal that a matching task was found, so check that here
    127         # and remember it so we don't try to skip tasks on future plays
    128         if getattr(self._options, 'start_at_task', None) is not None and play_context.start_at_task is None:
    129             self._start_at_done = True
    130 
    131         # and run the play using the strategy and cleanup on way out
    132         play_return = strategy.run(iterator, play_context)
    133 
    134         # now re-save the hosts that failed from the iterator to our internal list
    135         for host_name in iterator.get_failed_hosts():
    136             self._failed_hosts[host_name] = True
    137 
    138         strategy.cleanup()
    139         self._cleanup_processes()
    140         return play_return
    141     def load_callbacks(self):
    142         '''
    143         Loads all available callbacks, with the exception of those which
    144         utilize the CALLBACK_TYPE option. When CALLBACK_TYPE is set to 'stdout',
    145         only one such callback plugin will be loaded.
    146         '''
    147 
    148         if self._callbacks_loaded:
    149             return
    150 
    151         stdout_callback_loaded = False
    152         if self._stdout_callback is None:
    153             self._stdout_callback = C.DEFAULT_STDOUT_CALLBACK
    154 
    155         if isinstance(self._stdout_callback, CallbackBase): #判断传入的回调类是不是CallBack的子类
    156             stdout_callback_loaded = True
    157         elif isinstance(self._stdout_callback, string_types):
    158             if self._stdout_callback not in callback_loader:
    159                 raise AnsibleError("Invalid callback for stdout specified: %s" % self._stdout_callback)
    160             else:
    161                 self._stdout_callback = callback_loader.get(self._stdout_callback)
    162                 try:
    163                     self._stdout_callback.set_options()  #设置
    164                 except AttributeError:
    165                     display.deprecated("%s stdout callback, does not support setting 'options', it will work for now, "
    166                                        " but this will be required in the future and should be updated,"
    167                                        " see the 2.4 porting guide for details." % self._stdout_callback._load_name, version="2.9")
    168                 stdout_callback_loaded = True
    169         else:
    170             raise AnsibleError("callback must be an instance of CallbackBase or the name of a callback plugin")
    171 
    172         for callback_plugin in callback_loader.all(class_only=True):  #循环所有回调类
    173             callback_type = getattr(callback_plugin, 'CALLBACK_TYPE', '')
    174             callback_needs_whitelist = getattr(callback_plugin, 'CALLBACK_NEEDS_WHITELIST', False)
    175             (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
    176             if callback_type == 'stdout':
    177                 # we only allow one callback of type 'stdout' to be loaded,
    178                 if callback_name != self._stdout_callback or stdout_callback_loaded:
    179                     continue
    180                 stdout_callback_loaded = True
    181             elif callback_name == 'tree' and self._run_tree:
    182                 # special case for ansible cli option
    183                 pass
    184             elif not self._run_additional_callbacks or (callback_needs_whitelist and (
    185                     C.DEFAULT_CALLBACK_WHITELIST is None or callback_name not in C.DEFAULT_CALLBACK_WHITELIST)):
    186                 # 2.x plugins shipped with ansible should require whitelisting, older or non shipped should load automatically
    187                 continue
    188 
    189             callback_obj = callback_plugin()
    190             try:
    191                 callback_obj.set_options()
    192             except AttributeError:
    193                     display.deprecated("%s callback, does not support setting 'options', it will work for now, "
    194                                        " but this will be required in the future and should be updated, "
    195                                        " see the 2.4 porting guide for details." % callback_obj._load_name, version="2.9")
    196             self._callback_plugins.append(callback_obj)   #把合适的回调类添加进列表
    197 
    198         self._callbacks_loaded = True
     下面这个方法是执行回调方法
     1     def send_callback(self, method_name, *args, **kwargs):
     2         for callback_plugin in [self._stdout_callback] + self._callback_plugins:
     3             # a plugin that set self.disabled to True will not be called
     4             # see osx_say.py example for such a plugin
     5             if getattr(callback_plugin, 'disabled', False):
     6                 continue
     7 
     8             # try to find v2 method, fallback to v1 method, ignore callback if no method found
     9             methods = []
    10             for possible in [method_name, 'v2_on_any']:
    11                 gotit = getattr(callback_plugin, possible, None)
    12                 if gotit is None:
    13                     gotit = getattr(callback_plugin, possible.replace('v2_', ''), None) #在这里获取要回调的方法的对象
    14                 if gotit is not None:
    15                     methods.append(gotit) #添加进列表
    16 
    17             # send clean copies
    18             new_args = []
    19             for arg in args:
    20                 # FIXME: add play/task cleaners
    21                 if isinstance(arg, TaskResult):
    22                     new_args.append(arg.clean_copy())
    23                 # elif isinstance(arg, Play):
    24                 # elif isinstance(arg, Task):
    25                 else:
    26                     new_args.append(arg)
    27 
    28             for method in methods:
    29                 try:
    30                     method(*new_args, **kwargs) #执行回调方法
    31                 except Exception as e:
    32                     # TODO: add config toggle to make this fatal or not?
    33                     display.warning(u"Failure using method (%s) in callback plugin (%s): %s" % (to_text(method_name), to_text(callback_plugin), to_text(e)))
    34                     from traceback import format_tb
    35                     from sys import exc_info
    36                     display.vvv('Callback Exception: 
    ' + ' '.join(format_tb(exc_info()[2])))
     
  • 相关阅读:
    t
    溢出
    https://stackoverflow.com/questions/45591594/fetch-does-not-send-headers
    显示在用户屏幕上的缩略图
    cache buffer
    LZW
    t
    指针悬挂
    使用Linux服务器来通过网络安装和激活Windows 7 —— 一些基本原理
    在C++中如何动态分配一个二维数组
  • 原文地址:https://www.cnblogs.com/arrow-kejin/p/10109351.html
Copyright © 2011-2022 走看看