一、资产采集四种方式
1. Agent方式
API:Django接收数据并入库 程序:放置在每台服务器 应用场景:针对服务器较多的公司 步骤一: #执行本地命令的库 import subprocess sub = subprocess.getoutput("要执行的命令名") 每台机器通过用户名密码链接数据库,获取要执行的命令 步骤二: 采集数据 import subprocess # 采集到本机运行ipconfig命令的输出结果 result = subprocess.getoutput("ipconfig") print(result) 步骤三: 筛选整理数据 # 正则等方式 result_dic = { "网络信息":result, } 步骤四:发送数据到api # 以post方式将数据发送到url import requests requests.post("url", result_dic)
2. SSH方式
import paramiko # 通过paramiko库链接各台机器,获取命令 #缺点是慢 API:Django接收数据并入库 程序:放在中控机 应用场景:针对服务器较少的公司 SSH方式有三种工具,paramiko、Fabric、Ansible,其中Fabric、Ansible内部原理也是通过paramiko来实现得。 paramiko安装:pip3 install paramiko andible,fabic等工具对paramiko进行了封装,方便使用 速度更快一些! import requests,paramiko # 获取今天未采集的主机名 # 从url中获取信息 host_list = requests.get("url") # 通过paramiko连接远程服务器执行命令 # 创建ssh对象 ssh = paramiko.SSHClient() # 允许连接不在know_hosts文件中的主机 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接服务器 ssh.connect(hostname="hostname", port=22, username="root", password="password") # 执行命令 stdin, stdout, stderr = ssh.exec_command("hostname") # 获取结果 result = stdout.read() print(result) # 关闭连接 ssh.close() # 筛选整理数据 result_dic = { "网络信息":result, } # 发送获取的数据 requests.post("url", result_dic)
3. saltstack【python语言】
方式同ssh,原理不同 优点:提高速度,开发成本低 缺点:依赖saltstack工具 可以远程执行命令 http://www.cnblogs.com/wupeiqi/articles/6415436.html API:Django接收数据并入库 应用场景:针对服务器较多的公司【推荐】 master: v = subprocess.getoutput('salt "*" cmd.run "ls"') saltstack内部原理是通过RPC来实现得消息队列 步骤一: 安装saltstack rpm --import https://repo.saltstack.com/yum/redhat/7/x86_64/latest/SALTSTACK-GPG-KEY.pub yum install salt-master 服务端 yum install salt-minion 客户端 步骤二: Master准备 a.配置文件,修改监听ip /etc/salt/master interface: 本机ip地址 b.启动Master /etc/init.d/salt-master start 步骤三: Slave准备 a.修改配置文件,连接哪个master /etc/salt/minion master: 远程master的ip b.启动Slave /etc/init.d/salt-slave start 步骤四: 创建关系 Master:salt-key -L 查看哪些连接 salt-key -a 接受连接 salt-key -A 接收全部连接 步骤五: 执行命令 Master:salt "连接" cmd.run "命令" # salt 是一个.py文件 #!/usr/bin/python # Publish commands to the salt system from the command line on the master. from salt.scripts import salt_main if __name__ == '__main__': salt_main() # 在Python文件中使用salt的方法(需要提前配置好) import salt.client local = salt.client.LocalClient() result = local.cmd('对方ip', 'cmd.run', ['命令']) # result是一个字典 result.keys() # 命令 result.values() # 执行结果
4. puppet【ruby语言】
内部原理:puppet客户端每30秒钟自动汇报数据给puppet服务端。 应用场景:主要是针对已经在使用puppet的公司。 基于ruby 优点:自动汇报 缺点:要用ruby写
二、高级配置文件路径
1.目录结构
2.整合配置文件的代码
import os import importlib from . import global_settings class Settings(object): def __init__(self): # ------------ 找到默认配置文件---------------- for name in dir(global_settings): if name.upper(): # 配置文件里的变量名需要大写 value = getattr(global_settings,name) # 拿到值 setattr(self,name,value) # 设置值 # ------------ 找到用户自定义配置文件------------ settings_module = os.environ.get('USER_SETTINGS') # 加载设置到环境变量中的配置文件路径名 if not settings_module: # 如果用户没有自定义配置文件或没有初始化自定义文件 return # 直接返回不做其他操作 user_settings = importlib.import_module(settings_module) for name in dir(user_settings): if name.upper(): # 配置文件里的变量名需要大写 value = getattr(user_settings,name) # 拿到值 setattr(self,name,value) # 设置值 settings = Settings() # 实例化对象
3.加载自定义配置文件的名称到环境变量中
# 以下代码放置需要执行的py文件中 import os os.environ['USER_SETTINGS'] = "config.settings" from lib.conf.config import settings # 通过settings可以点出在自定义或默认配置文件中的相关变量名
三、可插拔式插件 (资产采集)
1.目录结构
2.流程介绍
主程序start.py 去执行 script.run()
import os os.environ['USER_SETTINGS'] = "config.settings" # 初始化 import sys BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(BASEDIR) #上级目录加入环境变量中 from src import script # 导入执行代码 if __name__ == '__main__': script.run()
script.py中 根据MODE去执行相应的方法
MODE是资产采集中四种方法中的一种
from lib.conf.config import settings from .client import Agent from .client import SSHSALT def run(): if settings.MODE == 'AGENT': # 配置文件中MODE的值为AGENT 或 ALT,SSH obj = Agent() else: obj = SSHSALT() obj.execute()
client.py中主要是拿到查到的主机的硬盘、CPU、内存等信息发给API
如果是Agent:其中Agent 每个客户端自己查询到信息后直接发给API
如果是SSHSALT:中控机服务器先向API发送请求得到要查询的主机列表
然后遍历列表,
PluginManager(host).exec_plugin() # 依次传入每个主机的host信息,并拿到返回的查询信息
import requests import json from src.plugins import PluginManager from lib.conf.config import settings # 基类 只有向API发送查询到信息的方法 class Base(object): def post_asset(self,server_info): requests.post(settings.API,json=server_info) # body: json.dumps(server_info) 直接写json=server_info可以把里面的东西改成json字符串 # headers= {'content-type':'application/json'} # request.body # json.loads(request.body) # 继承基类,每个客户端自己向API发送查询到的信息 class Agent(Base): def execute(self): server_info = PluginManager().exec_plugin() self.post_asset(server_info) # 向API发送查询到信息 class SSHSALT(Base): def get_host(self): # 获取未采集信息的主机列表 response = requests.get(settings.API) # 中控机向API发送get请求获取要查询的主机信息 result = json.loads(response.text) # "{status:'True',data: ['c1.com','c2.com']}" if not result['status']: # 状态发生异常,则直接结束跳过异常的主机 return return result['data'] def execute(self): host_list = self.get_host() # 拿到主机列表清单 for host in host_list: # 遍历拿到每个主机信息 server_info = PluginManager(host).exec_plugin() # 依次传入每个主机的host信息,并拿到返回的查询信息 self.post_asset(server_info) # 向API发送查询到信息
在plugins目录下的__init__.py文件中
函数 exec_plugin 获取所有的插件,执行插件里边的方法并获取插件返回值
从配置文件中获取到插件路径和类名
model_path,class_name = v.rsplit('.',1) # 从右向左按点分割一次,并赋值给相应变量
# 'basic': "src.plugins.basic.Basic" 数据结构
v 是"src.plugins.basic.Basic"
model_path,class_name = v.rsplit('.',1) # 从右向左按点分割一次,并赋值给相应变量
model_file = importlib.import_module(model_path) # 以字符串的形式导入模块
cls = getattr(model_file,class_name) # 利用反射机制拿到类对象
import importlib import traceback from lib.conf.config import settings class PluginManager(object): def __init__(self,hostname=None): self.hostname = hostname self.plugin_dict = settings.PLUGINS_DICT # 插件存放的目录 self.mode = settings.MODE self.debug = settings.DEBUG if self.mode == 'SSH': # 拿到配置文件中用来SSH登录的相关信息 self.ssh_user = settings.SSH_USER self.ssh_pwd = settings.SSH_PWD self.ssh_port = settings.SSH_PORT self.ssh_key = settings.SSH_KEY def exec_plugin(self): """ 获取所有的插件,并执行获取插件返回值 :return: """ response = {} for k,v in self.plugin_dict.items(): # 'basic': "src.plugins.basic.Basic" 数据结构 ret = {'status':True,'data':None} try: model_path,class_name = v.rsplit('.',1) # 从右向左按点分割一次,并赋值给相应变量 model_file = importlib.import_module(model_path) # 以字符串的形式导入模块 cls = getattr(model_file,class_name) # 拿到类对象 if hasattr(cls,'initial'): # 如果类中有initial方法 obj = cls.initial() # 执行类方法initial 并把返回值赋给obj else: obj = cls() # 否则直接实例化类对象 # obj.process(self.command,self.debug) 执行插件中的process方法,传入command函数和debug信息 result = obj.process(self.command,self.debug) # result = "根据v获取类,并执行其方法采集资产" ret['data'] = result except Exception as e: ret['status'] = False # traceback.format_exc() 可以返回更详细的错误信息字符串 ret['data'] = "[%s][%s] 采集数据出现错误 : %s" % ( self.hostname if self.hostname else "AGENT", k, traceback.format_exc()) response[k] = ret return response def command(self,cmd): if self.mode == "AGENT": return self.__agent(cmd) elif self.mode == "SSH": return self.__ssh(cmd) elif self.mode == "SALT": return self.__salt(cmd) else: raise Exception('模式只能是 AGENT/SSH/SALT') def __agent(self,cmd): # 如果配置文件中的方法是AGENT import subprocess output = subprocess.getoutput(cmd) return output def __ssh(self,cmd): # 如果配置文件中的方法是SSH import paramiko # 秘钥连接 # private_key = paramiko.RSAKey.from_private_key_file(self.ssh_key) # ssh = paramiko.SSHClient() # ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # ssh.connect(hostname=self.hostname, port=self.ssh_port, username=self.ssh_user, pkey=private_key) # stdin, stdout, stderr = ssh.exec_command(cmd) # result = stdout.read() # ssh.close() # 用户名密码连接 ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=self.hostname,port=self.ssh_port,username=self.ssh_user,password=self.ssh_pwd) stdin,stdout,stderr = ssh.exec_command(cmd) result = stdout.read() # 读到返回的信息 ssh.close() return result def __salt(self,cmd): # 也是通过操作subprocess # import salt.client # local = salt.client.LocalClient() # result = local.cmd(self.hostname, 'cmd.run', [cmd]) # return result[self.hostname] salt_cmd = "salt '%s' cmd.run '%s'"%(self.hostname,cmd,) import subprocess output = subprocess.getoutput(salt_cmd) return output # 读到返回的信息
四、唯一标识和线程池
1.唯一标识:主板SN(不准确)
标准化:
- 主机名不重复(装机同时,主机名在cmdb中设置)
步骤:
a. 装系统,初始化软件(CMDB),运行CMDB:
- 通过命令获取主机名
- 写入本地指定文件
b. 将资产信息发送到API
c. 获取资产信息:
- 本地文件主机名 != 命令获取的主机名(按照文件中的主机名)
- 本地文件主机名 == 命令获取的主机名
服务器资产采集(Agent):
a. 第一次:文件不存在,或内容为空;
采集资产:
- 主机名写入文件
- 发送API
b. 第N次:采集资产,主机名:文件中获取
SSH或Salt:
中控机:获取未采集主机名列表:【c1.com 】
# 继承基类,每个客户端自己向API发送查询到的信息 class Agent(Base): def execute(self): # 获取主机名唯一标识 server_info = PluginManager().exec_plugin() hostname = server_info['basic']['data']['hostname'] certname = open(settings.CERT_PATH, 'r', encoding='utf-8').read().strip() # 配置文件里拿到初始主机名 if not certname: # 第一次的时候,把主机名写入文件 with open(settings.CERT_PATH, 'w', encoding='utf-8') as f: f.write(hostname) else: # 以配置文件的主机名为准 server_info['basic']['data']['hostname'] = certname self.post_asset(server_info) # 向API发送查询到信息
2.线程池
def run(self,host): server_info = PluginManager(host).exec_plugin() # 依次传入每个主机的host信息,并拿到返回的查询信息 self.post_asset(server_info) # 向API发送查询到信息 def execute(self): from concurrent.futures import ThreadPoolExecutor host_list = self.get_host() # 拿到主机列表清单 pool = ThreadPoolExecutor(10) # 创建的线程池中有10个线程 for host in host_list: # 遍历拿到每个主机信息 pool.submit(self.run,host) # 把run函数放入线程池
五、资产入库(API 资产入库同时对外提供数据访问接口)
1.后台API拿到发过来的主机信息
server_info = json.loads(request.body.decode('utf-8')) # 查询到的主机信息
hostname = server_info['basic']['data']['hostname'] # 拿到客户端自己查到的主机名
2.根据主机名查到数据库中未更新的老资产主机对象
server_obj = models.Server.objects.filter(hostname=hostname).first() # 服务器对象
3.查不到则直接退出
if not server_obj:
return HttpResponse('当前主机名未在资产中录入')
4.录入资产信息
# -------------- 处理硬盘、网卡、内存等的信息资产入库----------------
PluginManager(server_info, server_obj, hostname).exec_plugin()
我们可以利用python的反射机制来定制插件
PluginManager类写在__init__.py里
来看一下代码
import importlib from autoserver import settings class PluginManager(object): def __init__(self, server_info,server_obj,hostname): self.server_info = server_info self.server_obj = server_obj self.hostname = hostname self.plugin_dict = settings.PLUGINS_DICT #引入配置文件中的插件路径信息 # 'disk': "api.service.disk.Disk" def exec_plugin(self): """ 获取所有的插件,并执行获取插件返回值 :return: """ for k, v in self.plugin_dict.items(): # 'disk': "api.service.disk.Disk", 数据结构 ret = {'status': True, 'data': None} try: # -------借鉴Django中间件的形式------------ model_path, class_name = v.rsplit('.', 1) # 从右向左按点分割一次,并赋值给相应变量 model_file = importlib.import_module(model_path) # 以字符串的形式导入模块 cls = getattr(model_file, class_name) # 拿到类对象 if cls: if hasattr(cls, 'initial'): # 如果类中有initial方法 obj = cls.initial() # 执行类方法initial 并把返回值赋给obj else: obj = cls() # 否则直接实例化类对象 obj.run(self.server_info,self.server_obj,self.hostname) # 资产入库" except Exception as e: pass return
没什么亮点 主要是 1.以字符串的形式导入模块,2.python机制来获取类对象和方法
再来看一下disk.py里面的代码
from repository import models class Disk(object): def __init__(self): pass @classmethod def initial(cls): return cls() def run(self,server_info,server_obj,hostname): # 采集信息出错时 if not server_info['disk']['status']: # 采集硬盘信息时报错了的话 models.ErrorLog.objects.create(content=server_obj['disk']['data'],asset_obj=server_obj.asset,title='【%s】硬盘采集错误信息'%(hostname)) # 新磁盘字典信息 new_disk_dict = server_info['disk']['data'] """slot 插槽位; pd_type 磁盘类型 ; capacity 磁盘容量GB; model 磁盘型号; { '5': {'slot': '5', 'pd_type': 'SAS', 'capacity': '279.396', 'model': 'SEAGATE ST300MM0006 LS08S0K2B5NV'}, '3': {'slot': '3', 'pd_type': 'SAS', 'capacity': '279.396', 'model': 'SEAGATE ST300MM0006 LS08S0K2B5AH'}, }""" # 旧磁盘列表信息 old_disk_list = models.Disk.objects.filter(server_obj=server_obj) """Queryset 列表字典类型,多个硬盘对象 [ Disk('slot':5,capacity:476...) Disk('slot':4,capacity:476...) ] """ new_slot_list = list(new_disk_dict.keys()) # 那新磁盘字典信息中的key 即槽位号 old_slot_list = [] for row in old_disk_list: # 遍历拿到数据库中旧的磁盘号 old_slot_list.append(row.slot) # 交集 更新【5】 则更新数据库的磁盘信息 update_list = set(new_slot_list).intersection(old_slot_list) # 交集列表 # 差集 创建【3】 则创建新的磁盘信息进数据库 create_list = set(new_slot_list).difference(old_slot_list) # 差集列表 # 差集 删除【4】 则把数据库里的不在返回来的新磁盘字典里的删除 del_list = set(old_slot_list).difference(new_slot_list) if del_list: # 删除 models.Disk.objects.filter(server_obj=server_obj,slot__in=del_list).delete() # 记录日志 models.AssetRecord.objects.create(asset_obj=server_obj.asset,content='移除硬盘%s'%('、'.join(del_list))) # 新增 record_list = [] for slot in create_list: disk_dict = new_disk_dict[slot] # {'capacity': '476.939', 'slot': '4', 'model': 'S1AXNSAF303909M Samsung SSD 840 PRO Series disk_dict['server_obj'] = server_obj # 多增加一个字段 models.Disk.objects.create(**disk_dict) # 以字典的方式增加 tmp = '新增硬盘 插槽位【{slot}】,磁盘类型【{pd_type}】,磁盘容量GB【{capacity}】,model磁盘型号【{model}】'.format(**disk_dict) record_list.append(tmp) # 依次把每次新增的信息加入一个列表中 # 记录日志 if record_list: con = ';'.join(record_list) models.AssetRecord.objects.create(asset_obj=server_obj.asset,content=con) # 更新 cord_list = [] row_map = {'capacity': '容量', 'pd_type': '类型', 'model': '型号'} for slot in update_list: new_dick_row = new_disk_dict[slot] # {'slot': '3', 'pd_type': 'SAS', 'capacity': '279.396', 'model': 'SEAGATE ST300MM0006 LS08S0K2B5AH'} old_disk_row = models.Disk.objects.filter(slot=slot,server_obj=server_obj).first() for k,v in new_dick_row.items(): # 遍历新信息的键和值 # k: capacity;slot;pd_type;model # v: '476.939''xxies DXM05B0Q''SATA' value = getattr(old_disk_row,k) # 拿到数据库中旧的值 if v != value: # 如果新值和旧值不一样,则说明需要更新 cord_list.append('槽位%s %s由%s变更为%s'.format(slot,row_map[k],value,v)) setattr(old_disk_row,k,v) # 更新数据库中的值 old_disk_row.save() # 写入数据库 # 记录日志 if cord_list: tent = ";".join(cord_list) models.AssetRecord.objects.create(asset_obj=server_obj.asset,content=tent)
在里边:
新磁盘字典信息 new_disk_dict = server_info['disk']['data']
旧磁盘列表信息 old_disk_list = models.Disk.objects.filter(server_obj=server_obj)
拿到 新插槽号列表 new_slot_list = list(new_disk_dict.keys())
拿到 旧插槽号列表 old_slot_list = []
for row in old_disk_list:
old_slot_list.append(row.slot)
update_list 交集 更新;create_list 差集 创建;del_list 差集 删除
. . . . . .
. . . . . .
六、API验证
1.自定义API验证
客户端代码
import time,requests,hashlib ctime = time.time() # 获取当前时间戳 key = 'asdfasdfasdfasdf098712sdfs' new_key = '%s|%s'%(key,ctime) m = hashlib.md5() m.update(bytes(new_key,encoding='utf-8')) md5_key = m.hexdigest() md5_time_key = '%s|%s'%(md5_key,ctime) print(md5_time_key) response = requests.get("http://127.0.0.1:8000/api/asset.html",headers={'OpenKey':md5_time_key}) print(response.text)
服务端
配置文件settings里加上 AUTH_KEY = "asdfasdfasdfasdf098712sdfs"
代码
api_key_record = { # "1b96b89695f52ec9de8292a5a7945e38|1501472467.4977243":1501472477.4977243 } def asset(request): # 所有的信息放在 request.META 里 client_md5_time_key = request.META.get('HTTP_OPENKEY') client_md5_key,client_ctime = client_md5_time_key.split('|') ctime = float(client_ctime) server_time = time.time() # 第一关 if server_time - ctime > 10: return HttpResponse('此随机字符串已失效') # 第二关 # settings.AUTH_KEY tmp = '%s|%s'%(settings.AUTH_KEY,client_ctime) m = hashlib.md5() m.update(bytes(tmp,encoding='utf-8')) server_md5_key = m.hexdigest() if server_md5_key != client_md5_key: return HttpResponse('字符串中的时间不允许被修改') # 维护 api_key_record 字典等容量 for k in list(api_key_record.keys()): v = float(api_key_record[k]) if server_time > v + 10 : del api_key_record[k] # 第三关 if client_md5_time_key in api_key_record: return HttpResponse('此随机字符串已访问过服务器端') else: # 把随机字符串当做key,把发过来时的时间当做value 记录进维护的api_key_record字典 api_key_record[client_md5_time_key] = client_ctime if request.method == 'GET': import_info = '我是重要的数据' return HttpResponse(import_info)
2.AES数据加密
#3.6安装 pip3 install pycryptodome #mac pip3 install pycrypto
客户端
utils.py是封装 数据加密、数据解密、API验证函数的组件
from Crypto.Cipher import AES from lib.conf.config import settings # 数据加密 def encrypt(message): """ 数据加密 :param message: :return: """ key = settings.DATA_KEY cipher = AES.new(key, AES.MODE_CBC, key) ba_data = bytearray(message,encoding='utf-8') v1 = len(ba_data) v2 = v1 % 16 if v2 == 0: v3 = 16 else: v3 = 16 - v2 for i in range(v3): ba_data.append(v3) final_data = ba_data.decode('utf-8') msg = cipher.encrypt(final_data) # 要加密的字符串,必须是16个字节或16个字节的倍数 return msg # 数据解密 def decrypt(msg): """ 数据解密 :param message: :return: """ from Crypto.Cipher import AES key = settings.DATA_KEY cipher = AES.new(key, AES.MODE_CBC, key) result = cipher.decrypt(msg) # result = b'xe8xa6x81xe5x8axa0xe5xafx86xe5x8axa0xe5xafx86xe5x8axa0sdfsd ' data = result[0:-result[-1]] return str(data,encoding='utf-8') # API验证 def auth(): """ API验证 :return: """ import hashlib,time ctime = time.time() key = "asdfasdfasdfasdf098712sdfs" new_key = "%s|%s" %(key,ctime,) m = hashlib.md5() m.update(bytes(new_key,encoding='utf-8')) #里面是字节数据 md5_key = m.hexdigest() #返回值是字符串类型 md5_time_key = "%s|%s" %(md5_key,ctime) return md5_time_key
在client.py中的基类里,把要发给服务端的API信息进行加密
import requests from lib.conf.config import settings from lib.utils import encrypt,auth class Base(object): def post_asset(self,server_info): data = encrypt(json.dumps(server_info)) requests.post( url=settings.API, data=data, headers={'OpenKey':auth(),'Content-Type':'application/json'} )
服务端
先弄出解密函数
def decrypt(msg): from Crypto.Cipher import AES key = b'dfdsdfsasdfdsdfs' cipher = AES.new(key, AES.MODE_CBC, key) result = cipher.decrypt(msg) # result = b'xe8xa6x81xe5x8axa0xe5xafx86xe5x8axa0xe5xafx86xe5x8axa0sdfsd ' data = result[0:-result[-1]] return str(data,encoding='utf-8')
客户端通过post请求传过来server_info信息,然后直接调用解密函数解密即可
elif request.method == 'POST': server_info = decrypt(request.body) server_info = json.loads(server_info)