Yearning是一个mysql开源sql语句审核平台,工单流程:开发者提交sql工单->主管审核->运维执行
如果你只是一个工单执行者可以屏蔽一些关键词后自动执行工单
#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import jsonpath import time import sys import json def get_token(): # 获取认证的token data = { "username": "", "password": "" } headers = { "Accept": "application/json" } request = requests.post(api_url + "ldapauth", data=data, headers=headers) request = request.json() token = jsonpath.jsonpath(request, "$.token")[0] return token def get_work(): # 获取工单列表 headers = { "Authorization": "JWT" + " " + token } request = requests.get( api_url + 'audit_sql?page=1&query={"picker":["",""],"user":"","valve":false}', headers=headers) return request.json() def get_sql(): # 获取工单里面的sql headers = { "Accept": "application/json", "Authorization": "JWT" + " " + token } request = requests.get( api_url + 'getsql?id=' + str(id) + '&bundle_id=' + str(bundle_id), headers=headers) return request.json() def audit_sql(): # 执行工单 data = { "type": 1, "to_user": username, "id": id } data = json.dumps(data) headers = { "Content-Type": "application/json", "Authorization": "JWT" + " " + token } request = requests.put(api_url + 'audit_sql', data=data, headers=headers) return request.text if __name__ == "__main__": api_url = "https://yearning.example.com/api/v1/" token = get_token() work_list = get_work() for v in work_list['data']: id = v['id'] bundle_id = v['bundle_id'] status = v['status'] type = v['type'] username = v['username'] work_id = v['work_id'] if status == 2: # 工单状态等于2则代表待执行 sql = get_sql()['sql'] sql = str.lower(sql) keyword = ["drop", "truncate", "grant", "lock"] for i in range(0, len(keyword)): key = keyword[i] if key in sql: print("停止执行有敏感词:" + key) sys.exit() audit_sql_return = audit_sql() #执行工单 now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) log = now_time + ' ' + audit_sql_return + ' ' + work_id + ' "' + sql + '"' log_file = '/tmp/audit-sql.log' print(log) f = open(log_file, 'a') f.write(log + ' ') f.close() time.sleep(2)