zoukankan      html  css  js  c++  java
  • Ansible 多机文件分发、执行脚本并单机合并实验结果(Check point, 多线程异步执行,主机状态检测等)

    简介

    Ansible其实是一个自动化管理工具,可以管理多台机器的自动化部署与安装。在python中,结合Ansible来试验任务多机执行功能,采用Master-Slave架构,首先检测网络状态,剔除不可用主机,然后进行负载均衡,然后实现数据集的分发与异步执行,在执行过程中,采用check point的机制检测是否执行成功,当全部执行完毕后,Master收集所有实验结果。

    Host(host.ini)

    [master]
    netlab_x2
    
    [slave]
    netlab_x1

    Config(Part)(config.ini)

    [ansible]
    master_load = 5
    slave_load = 1
    code_home = /home/guangfa/ansible/guangfa

    Python Script

      1 #!/usr/bin/env python
      2 # -*- coding:utf-8 -*-
      3 
      4 import sys, os
      5 import ConfigParser
      6 import ansible.runner
      7 import time
      8 
      9 inventory_file = "../host.ini"
     10 database_path = "../database"
     11 config_file = "../config.ini"
     12 
     13 class DistributedByAnsible:
     14     def __init__(self):
     15         cf = ConfigParser.ConfigParser()
     16     cf.read(config_file)
     17     self.master_load = int(cf.get("ansible", "master_load"))
     18         self.slave_load = int(cf.get("ansible", "slave_load"))
     19 
     20     self.code_home_path = cf.get("ansible", "code_home")
     21     self.ansibleResult = self.code_home_path + "/ansibleResult"
     22     self.database_path = self.code_home_path + "/database"
     23 
     24 
     25     self.avail_master = []
     26     self.avail_slave = []
     27     self.loadBalancing = {}
     28 
     29     def extractHost(self,config_file_path):
     30         cf = ConfigParser.ConfigParser(allow_no_value = True)
     31         cf.read(config_file_path)
     32         s = cf.sections()
     33         #print 'section:', s
     34         ms = cf.options("master")
     35         #print 'options:', ms
     36     for x in ms:
     37         runner = ansible.runner.Runner(
     38         host_list=inventory_file,
     39             module_name='ping',
     40         module_args='',
     41         pattern=x
     42         )
     43         res = runner.run()
     44         #print res
     45         if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
     46         print "Host Error:", x, "Unreachable"
     47             continue
     48         self.avail_master.append(x)
     49     self.avail_master = list(set(self.avail_master))
     50     print self.avail_master
     51     ss = cf.options("slave")
     52     for x in ss:
     53         runner = ansible.runner.Runner(
     54             host_list=inventory_file,
     55             module_name='ping',
     56         module_args='',
     57         pattern=x
     58         )
     59         res = runner.run()
     60         #print res
     61         if res['dark'] and res['dark'][x] and res['dark'][x]['failed']:
     62             continue
     63         self.avail_slave.append(x)
     64     self.avail_slave = list(set(self.avail_slave))
     65 
     66     # make true the host is unique
     67     for host in self.avail_master:
     68         if host in self.avail_slave:
     69             self.avail_slave.remove(host)
     70     print self.avail_slave
     71     
     72     def calculateLoadBalancing(self):
     73         load_count = int(os.popen("ls %s | grep ^mygraph | wc -l" % database_path).read())
     74     self.load_count = load_count
     75     current_load_count = load_count
     76     print load_count
     77     print self.master_load, self.slave_load
     78     master_all_count = len(self.avail_master)
     79     slave_all_count = len(self.avail_slave)
     80     
     81     for m in self.avail_master:
     82         temp_count = min(current_load_count, int(load_count * (self.master_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
     83         print temp_count
     84         if temp_count > 0:
     85                 self.loadBalancing[m] = temp_count
     86         current_load_count -= temp_count
     87         for s in self.avail_slave:
     88         temp_count = min(current_load_count, int(load_count * (self.slave_load / float(self.master_load * master_all_count + self.slave_load * slave_all_count))))
     89         if temp_count > 0:
     90             self.loadBalancing[s] = temp_count
     91         current_load_count -= temp_count
     92 
     93     if current_load_count > 0 and master_all_count > 0:
     94         self.loadBalancing[self.avail_master[0]] += current_load_count
     95         current_load_count = 0
     96     if current_load_count > 0 and slave_all_count > 0:
     97         self.loadBalancing[self.avail_slave[0]] += current_load_count
     98         current_load_count = 0
     99     print self.loadBalancing
    100     
    101     def fileTransfer(self):
    102         print self.code_home_path
    103     print self.ansibleResult
    104     if self.ansibleResult == "":
    105         print "ERROR: the home of the result about ansible is null!"
    106         sys.exit(1)
    107     current_database_count = 0
    108     #clean the directory of the result
    109     for host in self.loadBalancing:
    110         print host
    111         runner = ansible.runner.Runner(
    112             host_list=inventory_file,
    113         module_name='file',
    114         module_args='path=%s state=absent' % self.ansibleResult,
    115         pattern=host
    116         )
    117         res = runner.run()
    118         #print res
    119         runner = ansible.runner.Runner(
    120             host_list=inventory_file,
    121         module_name='file',
    122         module_args='path=%s state=directory mode=0750' % self.ansibleResult,
    123         pattern=host
    124         )
    125         res = runner.run()
    126         #print res
    127             
    128         # transfer the database
    129             this_host_load_count = self.loadBalancing[host]
    130         for i in range(this_host_load_count) :
    131             runner = ansible.runner.Runner(
    132             host_list=inventory_file,
    133             module_name='copy',
    134             module_args='src=%s/mygraph%s dest=%s mode=0750' % (self.database_path, current_database_count + i, self.ansibleResult),
    135             pattern=host
    136         )
    137         res = runner.run()
    138         print res
    139         #print current_database_count
    140         #print this_host_load_count
    141         current_database_count += this_host_load_count
    142 
    143         # transfer the code
    144         file_code_list = ["community.jar", "config.ini", "runAll.sh","lib"]
    145         for f in file_code_list:
    146             runner = ansible.runner.Runner(
    147             host_list=inventory_file,
    148             module_name='copy',
    149             module_args='src=%s/%s dest=%s mode=770' % (self.code_home_path, f, self.code_home_path),
    150             pattern=host
    151         )
    152         res = runner.run()
    153         print res
    154              
    155         # run the code distributed
    156         runner = ansible.runner.Runner(
    157             host_list=inventory_file,
    158         module_name="shell",
    159         module_args='chdir=%s nohup bash runAll.sh %s > bash.log &' % (self.code_home_path, self.ansibleResult),
    160         pattern=host
    161         )
    162 
    163         res = runner.run()
    164         print res
    165         print "The Host named", host, "is running!"
    166 
    167     def informationTransfer(self, transfer_module_name, transfer_module_args, host):
    168         print transfer_module_args
    169     print transfer_module_name
    170     runner = ansible.runner.Runner(
    171         host_list=inventory_file,
    172         module_name=transfer_module_name,
    173         module_args=transfer_module_args,
    174         pattern=host
    175     )
    176     res = runner.run()
    177     print res
    178 
    179     def checkPoint(self):
    180         os.system("rm -rf %s/output" % self.database_path)
    181         os.system("mkdir -p %s/output" % self.database_path)
    182         print "Begining CheckPoint"
    183     host_keys = self.loadBalancing.keys()
    184     #print host_keys
    185     while len(host_keys) > 0:
    186         for host in host_keys:
    187         runner = ansible.runner.Runner(
    188             host_list=inventory_file,
    189             module_name='command',
    190             module_args='removes=%s/success_log pwd' % self.ansibleResult,
    191             pattern=host
    192         )
    193         res = runner.run()
    194         print res
    195         if res['contacted'] and res['contacted'][host] and "skipped" in res['contacted'][host]['stdout']:
    196             continue
    197         else:
    198             print "Host", host, "finished his tasks!"
    199             host_keys.remove(host)
    200             self.informationTransfer("synchronize", "src=%s/output dest=%s mode=pull" % (self.ansibleResult, self.database_path), host)
    201             print host_keys
    202             print "to sleep"
    203             time.sleep(5)
    204     
    205 
    206 if __name__ == "__main__":
    207     ob = DistributedByAnsible()
    208     ob.extractHost(inventory_file)
    209     #ob.calculateLoadBalancing();
    210     #ob.fileTransfer();
    211     #ob.checkPoint();
    212     print "Done"
    213 
    214     

    待完善部分

    主机检测状态仅在开始时检测,剔除无用主机,但是当脚本运行中途机器死掉,此时需要分发另一台主机来执行任务。

  • 相关阅读:
    POJ 1631 Bridging signals
    POJ 1451 T9
    企业应用架构模式阅读笔记3
    论面向服务架构设计及其应用
    软件质量属性-可测试性
    企业应用架构模式阅读笔记2
    企业应用架构模式阅读笔记1
    QTP安装与破解
    Android studio打不开,双击没有反应,已解决
    hibernate框架的基本原理及实例
  • 原文地址:https://www.cnblogs.com/loadofleaf/p/6428665.html
Copyright © 2011-2022 走看看