zoukankan      html  css  js  c++  java
  • Python操作Saltstack

    1.代码

    # -*- coding:utf-8 -*-
    import urllib.request
    import urllib.parse
    import json
    
    
    class saltAPI():
    
        def __init__(self):
            self.url = 'http://192.168.174.128:8000'
            self.data = {'username' : 'saltapi',
                         'password' : 'saltapi',
                         'eauth' : 'pam'}
            self.headers = {'User-Agent':'Mozilla/5.0 (X11; Fedora; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'}
            self.token = self.get_token()
            self.headers['X-Auth-Token'] = self.token
    
        #发送post
        def post_request(self,prefix='/',postdata=None):
            request = urllib.request.Request(self.url+ prefix,postdata,headers=self.headers)
            reponse = urllib.request.urlopen(request).read()
            content  = json.loads(reponse.decode('utf8'))
            return content
    
        #获取token
        def get_token(self):
            postdata = urllib.parse.urlencode(self.data).encode('utf8')
            content = self.post_request('/login',postdata)
            try:
                token = content['return'][0]['token']
                return token
            except KeyError:
                raise KeyError
    
        #获取grains
        def get_grains(self,minion=''):
            if minion and minion!='*':
                prefix = '/minions/'+minion
            else:
                prefix = '/minions'
            content = self.post_request(prefix)
            return content
    
        #执行命令->get_jid
        def SaltCmd(self,tgt,fun,client='local_async',expr_form='glob',arg=None,**kwargs):
            params = {'client':client, 'fun':fun, 'tgt':tgt, 'expr_form':expr_form}
            if arg:
                a=arg.split(',') #参数按逗号分隔
                for i in a:
                    b=i.split('=') #每个参数再按=号分隔
                    if len(b)>1:
                        params[b[0]]='='.join(b[1:]) #带=号的参数作为字典传入
                    else:
                        params['arg%s'%(a.index(i)+100)]=i
            if kwargs:
                params=dict(list(params.items())+list(kwargs.items()))
            obj = urllib.parse.urlencode(params).encode('UTF8')
            ret = self.post_request(prefix='/',postdata=obj)
            jid = ret['return'][0]['jid']
            prefix = '/jobs/'+jid
            content = self.post_request(prefix)
            return content['info'][0]['Result']
    
        def test(self):
            print(self.headers)
    
    if __name__ == '__main__':
        tester = saltAPI()
        token = tester.SaltCmd('*','test.ping')
        print(token)
  • 相关阅读:
    史上最全的网银转账测试分析与设计
    【面试题】你是测试工程师,如何保证软件的质量?
    小白成长建议--小白如何提问
    [感悟]性能测试测什么
    通过一个简单的数据库操作类了解PHP链式操作的实现
    PHP魔术方法小结.md
    谈PHP中信息加密技术
    PHP输入流php://input [转]
    【PHPsocket编程专题(实战篇③)】构建基于socket的HTTP请求类
    从一次面试经历谈PHP的普通传值与引用传值以及unset
  • 原文地址:https://www.cnblogs.com/mascot1/p/9964245.html
Copyright © 2011-2022 走看看