zoukankan      html  css  js  c++  java
  • Ansible API

    一、环境说明

    当前环境说明:

    • python3.8.6
    • ansible2.10.5 

    ansible版本差异说明:

    • ansible2.0版本前后有较大改变,鉴于当前已经到了2.10版本,不再过多说明历史2.0版本的变动。可参考文章:链接
    • ansible2.4,对于Inventory-->InventoryManager  VariableManager类的使用,导入方式进行了改变。可参考文章:链接
    • ansible2.8,对于初始化有了改变,2.7 使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和 context结合使用的。

    tip:  pip3 install ansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。

    二、官方代码解析

    2.1、官方示例(Ad-hoc)

      1 #!/usr/bin/env python
      2 
      3 from __future__ import (absolute_import, division, print_function)
      4 __metaclass__ = type
      5 
      6 import json
      7 import shutil
      8 
      9 import ansible.constants as C
     10 from ansible.executor.task_queue_manager import TaskQueueManager
     11 from ansible.module_utils.common.collections import ImmutableDict
     12 from ansible.inventory.manager import InventoryManager
     13 from ansible.parsing.dataloader import DataLoader
     14 from ansible.playbook.play import Play
     15 from ansible.plugins.callback import CallbackBase
     16 from ansible.vars.manager import VariableManager
     17 from ansible import context
     18 
     19 
     20 # Create a callback plugin so we can capture the output
     21 class ResultsCollectorJSONCallback(CallbackBase):
     22     """A sample callback plugin used for performing an action as results come in.
     23 
     24     If you want to collect all results into a single object for processing at
     25     the end of the execution, look into utilizing the ``json`` callback plugin
     26     or writing your own custom callback plugin.
     27     """
     28 
     29     def __init__(self, *args, **kwargs):
     30         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
     31         self.host_ok = {}
     32         self.host_unreachable = {}
     33         self.host_failed = {}
     34 
     35     def v2_runner_on_unreachable(self, result):
     36         host = result._host
     37         self.host_unreachable[host.get_name()] = result
     38 
     39     def v2_runner_on_ok(self, result, *args, **kwargs):
     40         """Print a json representation of the result.
     41 
     42         Also, store the result in an instance attribute for retrieval later
     43         """
     44         host = result._host
     45         self.host_ok[host.get_name()] = result
     46         print(json.dumps({host.name: result._result}, indent=4))
     47 
     48     def v2_runner_on_failed(self, result, *args, **kwargs):
     49         host = result._host
     50         self.host_failed[host.get_name()] = result
     51 
     52 
     53 def main():
     54     host_list = ['localhost', 'www.example.com', 'www.google.com']
     55     # since the API is constructed for CLI it expects certain options to always be set in the context object
     56     context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
     57                                     become_method=None, become_user=None, check=False, diff=False)
     58     # required for
     59     # https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204
     60     sources = ','.join(host_list)
     61     if len(host_list) == 1:
     62         sources += ','
     63 
     64     # initialize needed objects
     65     loader = DataLoader()  # Takes care of finding and reading yaml, json and ini files
     66     passwords = dict(vault_pass='secret')
     67 
     68     # Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
     69     results_callback = ResultsCollectorJSONCallback()
     70 
     71     # create inventory, use path to host config file as source or hosts in a comma separated string
     72     inventory = InventoryManager(loader=loader, sources=sources)
     73 
     74     # variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
     75     variable_manager = VariableManager(loader=loader, inventory=inventory)
     76 
     77     # instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
     78     # IMPORTANT: This also adds library dirs paths to the module loader
     79     # IMPORTANT: and so it must be initialized before calling `Play.load()`.
     80     tqm = TaskQueueManager(
     81         inventory=inventory,
     82         variable_manager=variable_manager,
     83         loader=loader,
     84         passwords=passwords,
     85         stdout_callback=results_callback,  # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
     86     )
     87 
     88     # create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
     89     play_source = dict(
     90         name="Ansible Play",
     91         hosts=host_list,
     92         gather_facts='no',
     93         tasks=[
     94             dict(action=dict(module='shell', args='ls'), register='shell_out'),
     95             dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
     96             dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
     97         ]
     98     )
     99 
    100     # Create play object, playbook objects use .load instead of init or new methods,
    101     # this will also automatically create the task objects from the info provided in play_source
    102     play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
    103 
    104     # Actually run it
    105     try:
    106         result = tqm.run(play)  # most interesting data for a play is actually sent to the callback's methods
    107     finally:
    108         # we always need to cleanup child procs and the structures we use to communicate with them
    109         tqm.cleanup()
    110         if loader:
    111             loader.cleanup_all_tmp_files()
    112 
    113     # Remove ansible tmpdir
    114     shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
    115 
    116     print("UP ***********")
    117     for host, result in results_callback.host_ok.items():
    118         print('{0} >>> {1}'.format(host, result._result['stdout']))
    119 
    120     print("FAILED *******")
    121     for host, result in results_callback.host_failed.items():
    122         print('{0} >>> {1}'.format(host, result._result['msg']))
    123 
    124     print("DOWN *********")
    125     for host, result in results_callback.host_unreachable.items():
    126         print('{0} >>> {1}'.format(host, result._result['msg']))
    127 
    128 
    129 if __name__ == '__main__':
    130     main()
    View Code

    2.2、拆分说明

    •  导入模块详解
     1 # 初始化ansible的配置选项,比如: 指定远程用户remote_user=None
     2 from ansible.module_utils.common.collections import ImmutableDict
     3 # 上下文管理器,他就是用来接收 ImmutableDict 的示例对象
     4 from ansible import context
     5 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
     6 from ansible.parsing.dataloader import DataLoader
     7 # 变量管理器,包括主机,组,扩展等变量
     8 from ansible.vars.manager import VariableManager
     9 # 用于创建和管理inventory,导入inventory文件
    10 from ansible.inventory.manager import InventoryManager
    11 # 用于执行 Ad-hoc 的类
    12 from ansible.playbook.play import Play
    13 # ad-hoc ansible底层用到的任务队列
    14 from ansible.executor.task_queue_manager import TaskQueueManager
    15 # 回调基类,用来定义回调事件,比如返回失败成功等信息
    16 from ansible.plugins.callback import CallbackBase
    17 # 执行playbook
    18 from ansible.executor.playbook_executor import PlaybookExecutor
    19 # 操作单个主机,可以给主机添加变量等操作
    20 from ansible.inventory.host import Host
    21 # 操作单个主机组,可以给组添加变量等操作
    22 from ansible.inventory.group import Group
    23 # 用于获取 ansible 产生的临时文档
    24 import ansible.constants as C
    • 回调类
     1 class ResultsCollectorJSONCallback(CallbackBase):
     2     """
     3     回调插件就是一个类。 将执行命令的结果放到一个对象中,一边用json或自定义插件,获取执行结果的。 我们可以改写这个类,以便满足我们查询执行结果格式的需求。
     4     """
     5 
     6     def __init__(self, *args, **kwargs):
     7         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
     8         self.host_ok = {}
     9         self.host_unreachable = {}
    10         self.host_failed = {}
    11 
    12     def v2_runner_on_unreachable(self, result):
    13         host = result._host
    14         self.host_unreachable[host.get_name()] = result
    15 
    16     def v2_runner_on_ok(self, result, *args, **kwargs):
    17         """
    18         将结果以json形式打印。indent=4 是json格式美化参数,便于阅读
    19         将结果存储在实例属性中,以便稍后检索
    20         """
    21         host = result._host
    22         self.host_ok[host.get_name()] = result
    23         print(json.dumps({host.name: result._result}, indent=4))
    24 
    25     def v2_runner_on_failed(self, result, *args, **kwargs):
    26         host = result._host
    27         self.host_failed[host.get_name()] = result
    • 初始化选项和source
    1 # 构建一些初始化选项,在context对象中被设置
    2 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
    3                                 become_method=None, become_user=None, check=False, diff=False)
    4 # source指inventory文件数据,默认指/etc/ansible/hosts,也可读取yaml/json/ini格式的文件,或者主机名以逗号隔开的字符串。
    5 host_list = ['localhost', 'www.example.com', 'www.google.com']
    6 sources = ','.join(host_list)
    7 if len(host_list) == 1:
    8     sources += ','
    •  实例化对象
     1 # 初始化所需的对象集
     2 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
     3 loader = DataLoader()
     4 # 密码这里是必须使用的一个参数,假如通过公钥信任,也可以给一个空字典:dict()
     5 passwords = dict(vault_pass='secret')
     6 
     7 # 回调对象,用于查询执行结果
     8 results_callback = ResultsCollectorJSONCallback()
     9 
    10 # 资源管理器,创建inventory对象,引入上面的source。
    11 # 也可以:sources='/etc/ansible/hosts'。也可以:sources='host1,host2,...'
    12 inventory = InventoryManager(loader=loader, sources=sources)
    13 
    14 # 变量管理器,管理变量
    15 variable_manager = VariableManager(loader=loader, inventory=inventory)
    •  任务队列管理器
     1 # 实例化任务队列管理器
     2 # tip: 会加载库目录到loader里
     3 # tip: 必须在`Play.load()`之前初始化队列实例
     4 tqm = TaskQueueManager(
     5     inventory=inventory,
     6     variable_manager=variable_manager,
     7     loader=loader,
     8     passwords=passwords,
     9     stdout_callback=results_callback,  # 回调实例
    10 )
    • ansible命令模块和参数
     1 play_source = dict(
     2     name="Ansible Play",  # 名字,只未方便阅读
     3     hosts=host_list,      # 执行主机列表
     4     gather_facts='no',
     5     tasks=[
     6         dict(action=dict(module='shell', args='ls'), register='shell_out'),
     7         dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
     8         dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
     9     ]
    10 )
    • 执行
     1 # 定义play对象
     2 play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
     3 
     4 # 在队列中执行play对象
     5 try:
     6     result = tqm.run(play)  # 执行的结果返回码,成功是 0
     7 finally:
     8     # 如果 `tqm` 不是 `None`, 需要清理子进程和我们用来与它们通信的结构。
     9     tqm.cleanup()
    10     if loader:
    11         loader.cleanup_all_tmp_files()
    12 
    13 # 最后删除 ansible 产生的临时目录
    14 # 这个临时目录会在 ~/.ansible/tmp/ 目录下
    15 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
    • 查询结果 
     1 print("UP ***********")
     2 for host, result in results_callback.host_ok.items():
     3     print('{0} >>> {1}'.format(host, result._result['stdout']))
     4 
     5 print("FAILED *******")
     6 for host, result in results_callback.host_failed.items():
     7     print('{0} >>> {1}'.format(host, result._result['msg']))
     8 
     9 print("DOWN *********")
    10 for host, result in results_callback.host_unreachable.items():
    11     print('{0} >>> {1}'.format(host, result._result['msg']))

    2.3、调用playbook

     1 from ansible.executor.playbook_executor import PlaybookExecutor
     2 
     3 playbook = PlaybookExecutor(playbooks=['/root/test.yml'],  # tip:这是列表
     4                  inventory=inventory,
     5                  variable_manager=variable_manager,
     6                  loader=loader,
     7                  passwords=passwords)
     8 
     9 # 使用回调函数
    10 playbook._tqm._stdout_callback = results_callback
    11 
    12 result = playbook.run()
    13 
    14 for host, result in results_callback.host_ok.items():
    15     print("主机{}, 执行结果{}".format(host, result._result['result']['stdout'])

    三、二次开发

      1 # -*- coding:utf-8 -*-
      2 import json
      3 import shutil
      4 from ansible.module_utils.common.collections import ImmutableDict
      5 from ansible import context
      6 from ansible.parsing.dataloader import DataLoader
      7 from ansible.vars.manager import VariableManager
      8 from ansible.inventory.manager import InventoryManager
      9 from ansible.playbook.play import Play
     10 from ansible.executor.playbook_executor import PlaybookExecutor
     11 from ansible.executor.task_queue_manager import TaskQueueManager
     12 from ansible.plugins.callback import CallbackBase
     13 import ansible.constants as C
     14 
     15 
     16 class ResultCallback(CallbackBase):
     17     """
     18     重写callbackBase类的部分方法
     19     """
     20 
     21     def __init__(self, *args, **kwargs):
     22         # python3支持这样的语法
     23         # super().__init__(*args, **kwargs)
     24         # python2要用这样的语法
     25         super(ResultCallback, self).__init__(*args, **kwargs)
     26         self.host_ok = {}
     27         self.host_unreachable = {}
     28         self.host_failed = {}
     29         # self.task_ok = {}
     30 
     31     def v2_runner_on_unreachable(self, result):
     32         self.host_unreachable[result._host.get_name()] = result
     33 
     34     def v2_runner_on_ok(self, result, **kwargs):
     35         self.host_ok[result._host.get_name()] = result
     36 
     37     def v2_runner_on_failed(self, result, **kwargs):
     38         self.host_failed[result._host.get_name()] = result
     39 
     40 
     41 class MyAnsiable():
     42     def __init__(self,
     43                  connection='local',
     44                  module_path=None,
     45                  remote_user=None,
     46                  ack_pass=None,
     47                  sudo=None,
     48                  sudo_user=None,
     49                  ask_sudo_pass=None,
     50                  become=None,
     51                  become_method=None,
     52                  become_user=None,
     53                  listhosts=None,
     54                  listtasks=None,
     55                  listtags=None,
     56                  verbosity=3,
     57                  syntax=None,
     58                  start_at_task=None,
     59                  check=False,
     60                  diff=False,
     61                  inventory=None,
     62                  passwords=None):
     63         """
     64         初始化函数,定义的默认的选项值,
     65         在初始化的时候可以传参,以便覆盖默认选项的值
     66         """
     67         context.CLIARGS = ImmutableDict(
     68             connection=connection,
     69             module_path=module_path,
     70             remote_user=remote_user,
     71             ack_pass=ack_pass,
     72             sudo=sudo,
     73             sudo_user=sudo_user,
     74             ask_sudo_pass=ask_sudo_pass,
     75             become=become,
     76             become_method=become_method,
     77             become_user=become_user,
     78             listhosts=listhosts,
     79             listtasks=listtasks,
     80             listtags=listtags,
     81             verbosity=verbosity,
     82             syntax=syntax,
     83             start_at_task=start_at_task,
     84             check=check,
     85             diff=diff
     86         )
     87 
     88         # 三元表达式,假如没有传递 inventory, 就使用 "localhost,"
     89         # 确定 inventory 文件
     90         self.inventory = inventory if inventory else "localhost,"
     91 
     92         # 三元表达式,假如没有传递 passwords, 就使用 {}
     93         # 设置密码,可以为空字典,但必须有此参数
     94         self.passwords = passwords if passwords else {}
     95 
     96         # 实例化数据解析器
     97         self.loader = DataLoader()
     98 
     99         # 实例化 资产配置对象
    100         self.inventory_manager_obj = InventoryManager(loader=self.loader, sources=self.inventory)
    101 
    102         # 变量管理器
    103         self.variable_manager_obj = VariableManager(self.loader, self.inventory_manager_obj)
    104 
    105         # 实例化回调插件对象
    106         self.results_callback = ResultCallback()
    107 
    108     def adhoc(self, name='Ad-hoc', hosts='localhost', gather_facts="no", module="ping", args=''):
    109         play_source = dict(
    110             name=name,
    111             hosts=hosts,
    112             gather_facts=gather_facts,
    113             tasks=[
    114                 # 这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典
    115                 # 也可以作为参数传递过来,这里就简单化了。
    116                 {"action": {"module": module, "args": args}},
    117             ])
    118 
    119         play = Play().load(play_source, variable_manager=self.variable_manager_obj, loader=self.loader)
    120 
    121         tqm = None
    122         try:
    123             tqm = TaskQueueManager(
    124                 inventory=self.inventory_manager_obj,
    125                 variable_manager=self.variable_manager_obj,
    126                 loader=self.loader,
    127                 passwords=self.passwords,
    128                 stdout_callback=self.results_callback
    129             )
    130             result = tqm.run(play)
    131         finally:
    132             if tqm is not None:
    133                 tqm.cleanup()
    134             shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
    135 
    136     def playbook(self, playbooks):
    137         playbook = PlaybookExecutor(playbooks=playbooks,  # Tip: 这里是一个列表
    138                                     inventory=self.inventory_manager_obj,
    139                                     variable_manager=self.variable_manager_obj,
    140                                     loader=self.loader,
    141                                     passwords=self.passwords
    142                                     )
    143 
    144         # 使用回调函数
    145         playbook._tqm._stdout_callback = self.results_callback
    146         # 执行
    147         result = playbook.run()
    148 
    149     def get_result(self):
    150         result_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
    151         # print(self.results_callback.host_ok)
    152         for host, result in self.results_callback.host_ok.items():
    153             result_raw['success'][host] = result._result
    154 
    155         # print(self.results_callback.host_failed)
    156         for host, result in self.results_callback.host_failed.items():
    157             result_raw['failed'][host] = result._result
    158 
    159         # print(self.results_callback.host_unreachable)
    160         for host, result in self.results_callback.host_unreachable.items():
    161             result_raw['unreachable'][host] = result._result
    162 
    163         # 打印结果,并且使用 JSON 格式化
    164         print(json.dumps(result_raw, indent=4))
    View Code

    四、运行示例

    4.1、执行默认ad-hoc

    # -*- coding:utf-8 -*-
    from MyAnsible import MyAnsiable
    
    ansible_obj = MyAnsiable()
    ansible_obj.adhoc()
    ansible_obj.get_result()

    结果:

    {
        "success": {
            "localhost": {
                "ping": "pong",
                "invocation": {
                    "module_args": {
                        "data": "pong"
                    }
                },
                "ansible_facts": {
                    "discovered_interpreter_python": "/usr/bin/python"
                },
                "_ansible_no_log": false,
                "changed": false
            }
        },
        "failed": {},
        "unreachable": {}
    }
    View Code

    4.2、远程执行命令

    # 文件名: /root/ansible-playbook/api/hosts
    [tapi]
    10.96.0.59
    
    # 文件名: ansible03.py
    # -*- coding:utf-8 -*-
    from MyAnsible import MyAnsiable
    
    # 配置资产配置文件,并使用 ssh 的远程连接方式
    ansible_obj = MyAnsiable(inventory='/root/ansible-playbook/api/hosts', connection='smart')
    # 执行对象是hosts里的组名
    ansible_obj.adhoc(hosts='tapi', module='shell', args='ip a |grep "inet"')
    ansible_obj.get_result()

    结果:

    {
        "failed": {}, 
        "success": {
            "10.96.0.59": {
                "stderr_lines": [], 
                "cmd": "ip a |grep \"inet\"", 
                "end": "2021-01-28 15:41:25.972706", 
                "_ansible_no_log": false, 
                "stdout": "    inet 127.0.0.1/8 scope host lo\n    inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0", 
                "changed": true, 
                "rc": 0, 
                "start": "2021-01-28 15:41:25.960580", 
                "stderr": "", 
                "delta": "0:00:00.012126", 
                "invocation": {
                    "module_args": {
                        "creates": null, 
                        "executable": null, 
                        "_uses_shell": true, 
                        "strip_empty_ends": true, 
                        "_raw_params": "ip a |grep \"inet\"", 
                        "removes": null, 
                        "argv": null, 
                        "warn": true, 
                        "chdir": null, 
                        "stdin_add_newline": true, 
                        "stdin": null
                    }
                }, 
                "stdout_lines": [
                    "    inet 127.0.0.1/8 scope host lo", 
                    "    inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0"
                ], 
                "ansible_facts": {
                    "discovered_interpreter_python": "/usr/bin/python"
                }
            }
        }, 
        "unreachable": {}
    }
    View Code

    4.3、调用playbook

    研究过程中,发现不能传递额外的系统变量, 后面实例懒的写了,看完我前面的代码,执行playbook应该不成问题。

    参考资料

    • 二开:https://zhuanlan.zhihu.com/p/118411015
    •            http://www.linuxboy.net/ansiblejc/143275.html
    • ansible2.4版本前后模块差异:https://blog.csdn.net/qq_29832217/article/details/97005626
    • 常用模块使用:https://blog.csdn.net/yyy72999/article/details/81184720
    • 官网2.7  https://docs.ansible.com/ansible/2.7/dev_guide/developing_api.html
    • 官网2.8  https://docs.ansible.com/ansible/2.8/dev_guide/developing_api.html
    • 官网2.10  https://docs.ansible.com/ansible/latest/dev_guide/developing_api.html
  • 相关阅读:
    文档重复检测软件 DuplicateDetector
    Afinal 0.2.1 发布 Android的快速开发框架
    CshBBrain 4.0 发布,高性能WebSocket服务器
    英特尔的 C++ 编译器发布 13.0 版本
    Perl 5.16.2 发布
    OpenBSD 5.2 发布
    Webconverger 15.1 发布,适合网吧的 Linux
    Spring Framework 3.2 RC1 发布
    Postgres 9.2 新特性之:范围类型 (Range Types)
    Expression Blend实例中文教程系列文章汇总
  • 原文地址:https://www.cnblogs.com/wsongl/p/14331621.html
Copyright © 2011-2022 走看看