一、环境说明
当前环境说明:
- python3.8.6
- ansible2.10.5
ansible版本差异说明:
- ansible2.0版本前后有较大改变,鉴于当前已经到了2.10版本,不再过多说明历史2.0版本的变动。可参考文章:链接
- ansible2.4,对于Inventory-->InventoryManager VariableManager类的使用,导入方式进行了改变。可参考文章:链接
- ansible2.8,对于初始化有了改变,2.7 使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和 context结合使用的。
tip: pip3 install ansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。
二、官方代码解析
2.1、官方示例(Ad-hoc)
1 #!/usr/bin/env python 2 3 from __future__ import (absolute_import, division, print_function) 4 __metaclass__ = type 5 6 import json 7 import shutil 8 9 import ansible.constants as C 10 from ansible.executor.task_queue_manager import TaskQueueManager 11 from ansible.module_utils.common.collections import ImmutableDict 12 from ansible.inventory.manager import InventoryManager 13 from ansible.parsing.dataloader import DataLoader 14 from ansible.playbook.play import Play 15 from ansible.plugins.callback import CallbackBase 16 from ansible.vars.manager import VariableManager 17 from ansible import context 18 19 20 # Create a callback plugin so we can capture the output 21 class ResultsCollectorJSONCallback(CallbackBase): 22 """A sample callback plugin used for performing an action as results come in. 23 24 If you want to collect all results into a single object for processing at 25 the end of the execution, look into utilizing the ``json`` callback plugin 26 or writing your own custom callback plugin. 27 """ 28 29 def __init__(self, *args, **kwargs): 30 super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) 31 self.host_ok = {} 32 self.host_unreachable = {} 33 self.host_failed = {} 34 35 def v2_runner_on_unreachable(self, result): 36 host = result._host 37 self.host_unreachable[host.get_name()] = result 38 39 def v2_runner_on_ok(self, result, *args, **kwargs): 40 """Print a json representation of the result. 41 42 Also, store the result in an instance attribute for retrieval later 43 """ 44 host = result._host 45 self.host_ok[host.get_name()] = result 46 print(json.dumps({host.name: result._result}, indent=4)) 47 48 def v2_runner_on_failed(self, result, *args, **kwargs): 49 host = result._host 50 self.host_failed[host.get_name()] = result 51 52 53 def main(): 54 host_list = ['localhost', 'www.example.com', 'www.google.com'] 55 # since the API is constructed for CLI it expects certain options to always be set in the context object 56 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None, 57 become_method=None, become_user=None, check=False, diff=False) 58 # required for 59 # https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204 60 sources = ','.join(host_list) 61 if len(host_list) == 1: 62 sources += ',' 63 64 # initialize needed objects 65 loader = DataLoader() # Takes care of finding and reading yaml, json and ini files 66 passwords = dict(vault_pass='secret') 67 68 # Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets 69 results_callback = ResultsCollectorJSONCallback() 70 71 # create inventory, use path to host config file as source or hosts in a comma separated string 72 inventory = InventoryManager(loader=loader, sources=sources) 73 74 # variable manager takes care of merging all the different sources to give you a unified view of variables available in each context 75 variable_manager = VariableManager(loader=loader, inventory=inventory) 76 77 # instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks 78 # IMPORTANT: This also adds library dirs paths to the module loader 79 # IMPORTANT: and so it must be initialized before calling `Play.load()`. 80 tqm = TaskQueueManager( 81 inventory=inventory, 82 variable_manager=variable_manager, 83 loader=loader, 84 passwords=passwords, 85 stdout_callback=results_callback, # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout 86 ) 87 88 # create data structure that represents our play, including tasks, this is basically what our YAML loader does internally. 89 play_source = dict( 90 name="Ansible Play", 91 hosts=host_list, 92 gather_facts='no', 93 tasks=[ 94 dict(action=dict(module='shell', args='ls'), register='shell_out'), 95 dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), 96 dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))), 97 ] 98 ) 99 100 # Create play object, playbook objects use .load instead of init or new methods, 101 # this will also automatically create the task objects from the info provided in play_source 102 play = Play().load(play_source, variable_manager=variable_manager, loader=loader) 103 104 # Actually run it 105 try: 106 result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods 107 finally: 108 # we always need to cleanup child procs and the structures we use to communicate with them 109 tqm.cleanup() 110 if loader: 111 loader.cleanup_all_tmp_files() 112 113 # Remove ansible tmpdir 114 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) 115 116 print("UP ***********") 117 for host, result in results_callback.host_ok.items(): 118 print('{0} >>> {1}'.format(host, result._result['stdout'])) 119 120 print("FAILED *******") 121 for host, result in results_callback.host_failed.items(): 122 print('{0} >>> {1}'.format(host, result._result['msg'])) 123 124 print("DOWN *********") 125 for host, result in results_callback.host_unreachable.items(): 126 print('{0} >>> {1}'.format(host, result._result['msg'])) 127 128 129 if __name__ == '__main__': 130 main()
2.2、拆分说明
- 导入模块详解
1 # 初始化ansible的配置选项,比如: 指定远程用户remote_user=None 2 from ansible.module_utils.common.collections import ImmutableDict 3 # 上下文管理器,他就是用来接收 ImmutableDict 的示例对象 4 from ansible import context 5 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件 6 from ansible.parsing.dataloader import DataLoader 7 # 变量管理器,包括主机,组,扩展等变量 8 from ansible.vars.manager import VariableManager 9 # 用于创建和管理inventory,导入inventory文件 10 from ansible.inventory.manager import InventoryManager 11 # 用于执行 Ad-hoc 的类 12 from ansible.playbook.play import Play 13 # ad-hoc ansible底层用到的任务队列 14 from ansible.executor.task_queue_manager import TaskQueueManager 15 # 回调基类,用来定义回调事件,比如返回失败成功等信息 16 from ansible.plugins.callback import CallbackBase 17 # 执行playbook 18 from ansible.executor.playbook_executor import PlaybookExecutor 19 # 操作单个主机,可以给主机添加变量等操作 20 from ansible.inventory.host import Host 21 # 操作单个主机组,可以给组添加变量等操作 22 from ansible.inventory.group import Group 23 # 用于获取 ansible 产生的临时文档 24 import ansible.constants as C
- 回调类
1 class ResultsCollectorJSONCallback(CallbackBase): 2 """ 3 回调插件就是一个类。 将执行命令的结果放到一个对象中,一边用json或自定义插件,获取执行结果的。 我们可以改写这个类,以便满足我们查询执行结果格式的需求。 4 """ 5 6 def __init__(self, *args, **kwargs): 7 super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs) 8 self.host_ok = {} 9 self.host_unreachable = {} 10 self.host_failed = {} 11 12 def v2_runner_on_unreachable(self, result): 13 host = result._host 14 self.host_unreachable[host.get_name()] = result 15 16 def v2_runner_on_ok(self, result, *args, **kwargs): 17 """ 18 将结果以json形式打印。indent=4 是json格式美化参数,便于阅读 19 将结果存储在实例属性中,以便稍后检索 20 """ 21 host = result._host 22 self.host_ok[host.get_name()] = result 23 print(json.dumps({host.name: result._result}, indent=4)) 24 25 def v2_runner_on_failed(self, result, *args, **kwargs): 26 host = result._host 27 self.host_failed[host.get_name()] = result
- 初始化选项和source
1 # 构建一些初始化选项,在context对象中被设置 2 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None, 3 become_method=None, become_user=None, check=False, diff=False) 4 # source指inventory文件数据,默认指/etc/ansible/hosts,也可读取yaml/json/ini格式的文件,或者主机名以逗号隔开的字符串。 5 host_list = ['localhost', 'www.example.com', 'www.google.com'] 6 sources = ','.join(host_list) 7 if len(host_list) == 1: 8 sources += ','
- 实例化对象
1 # 初始化所需的对象集 2 # 管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件 3 loader = DataLoader() 4 # 密码这里是必须使用的一个参数,假如通过公钥信任,也可以给一个空字典:dict() 5 passwords = dict(vault_pass='secret') 6 7 # 回调对象,用于查询执行结果 8 results_callback = ResultsCollectorJSONCallback() 9 10 # 资源管理器,创建inventory对象,引入上面的source。 11 # 也可以:sources='/etc/ansible/hosts'。也可以:sources='host1,host2,...' 12 inventory = InventoryManager(loader=loader, sources=sources) 13 14 # 变量管理器,管理变量 15 variable_manager = VariableManager(loader=loader, inventory=inventory)
- 任务队列管理器
1 # 实例化任务队列管理器 2 # tip: 会加载库目录到loader里 3 # tip: 必须在`Play.load()`之前初始化队列实例 4 tqm = TaskQueueManager( 5 inventory=inventory, 6 variable_manager=variable_manager, 7 loader=loader, 8 passwords=passwords, 9 stdout_callback=results_callback, # 回调实例 10 )
- ansible命令模块和参数
1 play_source = dict( 2 name="Ansible Play", # 名字,只未方便阅读 3 hosts=host_list, # 执行主机列表 4 gather_facts='no', 5 tasks=[ 6 dict(action=dict(module='shell', args='ls'), register='shell_out'), 7 dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))), 8 dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))), 9 ] 10 )
- 执行
1 # 定义play对象 2 play = Play().load(play_source, variable_manager=variable_manager, loader=loader) 3 4 # 在队列中执行play对象 5 try: 6 result = tqm.run(play) # 执行的结果返回码,成功是 0 7 finally: 8 # 如果 `tqm` 不是 `None`, 需要清理子进程和我们用来与它们通信的结构。 9 tqm.cleanup() 10 if loader: 11 loader.cleanup_all_tmp_files() 12 13 # 最后删除 ansible 产生的临时目录 14 # 这个临时目录会在 ~/.ansible/tmp/ 目录下 15 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
- 查询结果
1 print("UP ***********") 2 for host, result in results_callback.host_ok.items(): 3 print('{0} >>> {1}'.format(host, result._result['stdout'])) 4 5 print("FAILED *******") 6 for host, result in results_callback.host_failed.items(): 7 print('{0} >>> {1}'.format(host, result._result['msg'])) 8 9 print("DOWN *********") 10 for host, result in results_callback.host_unreachable.items(): 11 print('{0} >>> {1}'.format(host, result._result['msg']))
2.3、调用playbook
1 from ansible.executor.playbook_executor import PlaybookExecutor 2 3 playbook = PlaybookExecutor(playbooks=['/root/test.yml'], # tip:这是列表 4 inventory=inventory, 5 variable_manager=variable_manager, 6 loader=loader, 7 passwords=passwords) 8 9 # 使用回调函数 10 playbook._tqm._stdout_callback = results_callback 11 12 result = playbook.run() 13 14 for host, result in results_callback.host_ok.items(): 15 print("主机{}, 执行结果{}".format(host, result._result['result']['stdout'])
三、二次开发
1 # -*- coding:utf-8 -*- 2 import json 3 import shutil 4 from ansible.module_utils.common.collections import ImmutableDict 5 from ansible import context 6 from ansible.parsing.dataloader import DataLoader 7 from ansible.vars.manager import VariableManager 8 from ansible.inventory.manager import InventoryManager 9 from ansible.playbook.play import Play 10 from ansible.executor.playbook_executor import PlaybookExecutor 11 from ansible.executor.task_queue_manager import TaskQueueManager 12 from ansible.plugins.callback import CallbackBase 13 import ansible.constants as C 14 15 16 class ResultCallback(CallbackBase): 17 """ 18 重写callbackBase类的部分方法 19 """ 20 21 def __init__(self, *args, **kwargs): 22 # python3支持这样的语法 23 # super().__init__(*args, **kwargs) 24 # python2要用这样的语法 25 super(ResultCallback, self).__init__(*args, **kwargs) 26 self.host_ok = {} 27 self.host_unreachable = {} 28 self.host_failed = {} 29 # self.task_ok = {} 30 31 def v2_runner_on_unreachable(self, result): 32 self.host_unreachable[result._host.get_name()] = result 33 34 def v2_runner_on_ok(self, result, **kwargs): 35 self.host_ok[result._host.get_name()] = result 36 37 def v2_runner_on_failed(self, result, **kwargs): 38 self.host_failed[result._host.get_name()] = result 39 40 41 class MyAnsiable(): 42 def __init__(self, 43 connection='local', 44 module_path=None, 45 remote_user=None, 46 ack_pass=None, 47 sudo=None, 48 sudo_user=None, 49 ask_sudo_pass=None, 50 become=None, 51 become_method=None, 52 become_user=None, 53 listhosts=None, 54 listtasks=None, 55 listtags=None, 56 verbosity=3, 57 syntax=None, 58 start_at_task=None, 59 check=False, 60 diff=False, 61 inventory=None, 62 passwords=None): 63 """ 64 初始化函数,定义的默认的选项值, 65 在初始化的时候可以传参,以便覆盖默认选项的值 66 """ 67 context.CLIARGS = ImmutableDict( 68 connection=connection, 69 module_path=module_path, 70 remote_user=remote_user, 71 ack_pass=ack_pass, 72 sudo=sudo, 73 sudo_user=sudo_user, 74 ask_sudo_pass=ask_sudo_pass, 75 become=become, 76 become_method=become_method, 77 become_user=become_user, 78 listhosts=listhosts, 79 listtasks=listtasks, 80 listtags=listtags, 81 verbosity=verbosity, 82 syntax=syntax, 83 start_at_task=start_at_task, 84 check=check, 85 diff=diff 86 ) 87 88 # 三元表达式,假如没有传递 inventory, 就使用 "localhost," 89 # 确定 inventory 文件 90 self.inventory = inventory if inventory else "localhost," 91 92 # 三元表达式,假如没有传递 passwords, 就使用 {} 93 # 设置密码,可以为空字典,但必须有此参数 94 self.passwords = passwords if passwords else {} 95 96 # 实例化数据解析器 97 self.loader = DataLoader() 98 99 # 实例化 资产配置对象 100 self.inventory_manager_obj = InventoryManager(loader=self.loader, sources=self.inventory) 101 102 # 变量管理器 103 self.variable_manager_obj = VariableManager(self.loader, self.inventory_manager_obj) 104 105 # 实例化回调插件对象 106 self.results_callback = ResultCallback() 107 108 def adhoc(self, name='Ad-hoc', hosts='localhost', gather_facts="no", module="ping", args=''): 109 play_source = dict( 110 name=name, 111 hosts=hosts, 112 gather_facts=gather_facts, 113 tasks=[ 114 # 这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典 115 # 也可以作为参数传递过来,这里就简单化了。 116 {"action": {"module": module, "args": args}}, 117 ]) 118 119 play = Play().load(play_source, variable_manager=self.variable_manager_obj, loader=self.loader) 120 121 tqm = None 122 try: 123 tqm = TaskQueueManager( 124 inventory=self.inventory_manager_obj, 125 variable_manager=self.variable_manager_obj, 126 loader=self.loader, 127 passwords=self.passwords, 128 stdout_callback=self.results_callback 129 ) 130 result = tqm.run(play) 131 finally: 132 if tqm is not None: 133 tqm.cleanup() 134 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) 135 136 def playbook(self, playbooks): 137 playbook = PlaybookExecutor(playbooks=playbooks, # Tip: 这里是一个列表 138 inventory=self.inventory_manager_obj, 139 variable_manager=self.variable_manager_obj, 140 loader=self.loader, 141 passwords=self.passwords 142 ) 143 144 # 使用回调函数 145 playbook._tqm._stdout_callback = self.results_callback 146 # 执行 147 result = playbook.run() 148 149 def get_result(self): 150 result_raw = {'success': {}, 'failed': {}, 'unreachable': {}} 151 # print(self.results_callback.host_ok) 152 for host, result in self.results_callback.host_ok.items(): 153 result_raw['success'][host] = result._result 154 155 # print(self.results_callback.host_failed) 156 for host, result in self.results_callback.host_failed.items(): 157 result_raw['failed'][host] = result._result 158 159 # print(self.results_callback.host_unreachable) 160 for host, result in self.results_callback.host_unreachable.items(): 161 result_raw['unreachable'][host] = result._result 162 163 # 打印结果,并且使用 JSON 格式化 164 print(json.dumps(result_raw, indent=4))
四、运行示例
4.1、执行默认ad-hoc
# -*- coding:utf-8 -*- from MyAnsible import MyAnsiable ansible_obj = MyAnsiable() ansible_obj.adhoc() ansible_obj.get_result()
结果:
{ "success": { "localhost": { "ping": "pong", "invocation": { "module_args": { "data": "pong" } }, "ansible_facts": { "discovered_interpreter_python": "/usr/bin/python" }, "_ansible_no_log": false, "changed": false } }, "failed": {}, "unreachable": {} }
4.2、远程执行命令
# 文件名: /root/ansible-playbook/api/hosts [tapi] 10.96.0.59 # 文件名: ansible03.py # -*- coding:utf-8 -*- from MyAnsible import MyAnsiable # 配置资产配置文件,并使用 ssh 的远程连接方式 ansible_obj = MyAnsiable(inventory='/root/ansible-playbook/api/hosts', connection='smart') # 执行对象是hosts里的组名 ansible_obj.adhoc(hosts='tapi', module='shell', args='ip a |grep "inet"') ansible_obj.get_result()
结果:
{ "failed": {}, "success": { "10.96.0.59": { "stderr_lines": [], "cmd": "ip a |grep \"inet\"", "end": "2021-01-28 15:41:25.972706", "_ansible_no_log": false, "stdout": " inet 127.0.0.1/8 scope host lo\n inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0", "changed": true, "rc": 0, "start": "2021-01-28 15:41:25.960580", "stderr": "", "delta": "0:00:00.012126", "invocation": { "module_args": { "creates": null, "executable": null, "_uses_shell": true, "strip_empty_ends": true, "_raw_params": "ip a |grep \"inet\"", "removes": null, "argv": null, "warn": true, "chdir": null, "stdin_add_newline": true, "stdin": null } }, "stdout_lines": [ " inet 127.0.0.1/8 scope host lo", " inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0" ], "ansible_facts": { "discovered_interpreter_python": "/usr/bin/python" } } }, "unreachable": {} }
4.3、调用playbook
研究过程中,发现不能传递额外的系统变量, 后面实例懒的写了,看完我前面的代码,执行playbook应该不成问题。
参考资料
- 二开:https://zhuanlan.zhihu.com/p/118411015
- http://www.linuxboy.net/ansiblejc/143275.html
- ansible2.4版本前后模块差异:https://blog.csdn.net/qq_29832217/article/details/97005626
- 常用模块使用:https://blog.csdn.net/yyy72999/article/details/81184720
- 官网2.7 https://docs.ansible.com/ansible/2.7/dev_guide/developing_api.html
- 官网2.8 https://docs.ansible.com/ansible/2.8/dev_guide/developing_api.html
- 官网2.10 https://docs.ansible.com/ansible/latest/dev_guide/developing_api.html