CMDB03 /今日未采集的资产、资产入库、资产变更记录、资产采集
1. 获取今日未采集的服务器
-
代码示例:
def get(self,request,*args,**kwargs): """ 返回今日未采集的服务器列表 """ today = datetime.datetime.today() queryset = models.Server.objects.filter(status=1).filter(Q(last_date__isnull=True)|Q(last_date__lt=today)).values('hostname') host_list = [ item['hostname'] for item in queryset] print(host_list) return Response(host_list)
2. server资产入库以及资产变更处理
-
post请求代码示例
def post(self,request,*args,**kwargs): # 1. 获取到用户提交资产信息 # 2. 保存到数据库(表关系) hostname = request.data.get('hostname') server_object = models.Server.objects.filter(hostname=hostname).first() if not server_object: return Response('主机不存在') process_server_info(request.data['info'],server_object) # 今日已经采集 server_object.last_date = datetime.datetime.today() server_object.save() return Response('发送成功')
-
plugins /
__init__.py
执行process_server_info函数,对接收到的消息做相应的处理代码示例:
import importlib from django.conf import settings def process_server_info(info,server_object): """ 处理中控汇报资产信息 """ for key,path in settings.CMDB_PLUGIN_DICT.items(): module_path,class_name = path.rsplit('.',maxsplit=1) module = importlib.import_module(module_path) instance = getattr(module,class_name)() instance.process(info[key],server_object)
-
判断做何处理/新增/删除/变更
class Disk(object): def process(self,disk,server_object): if not disk['status']: print('采集资产错误',disk['error']) return disk_info = disk['data'] new_disk_slot_set = set(disk_info.keys()) # [obj,obj] db_disk_queryset = models.Disk.objects.filter(server=server_object) db_disk_dict = {obj.slot: obj for obj in db_disk_queryset} db_disk_slot_set = set(db_disk_dict.keys()) record_msg_list = [] # 新增的槽位集合 create_slot_set = new_disk_slot_set - db_disk_slot_set create_object_list = [] for slot in create_slot_set: # models.Disk.objects.create(**disk_info[slot],server=server_object) create_object_list.append(models.Disk(**disk_info[slot], server=server_object)) if create_object_list: models.Disk.objects.bulk_create(create_object_list, batch_size=10) msg = "【新增硬盘】在%s槽位新增了硬盘。" % ",".join(create_slot_set) record_msg_list.append(msg) # 要删除的槽位集合 remove_slot_set = db_disk_slot_set - new_disk_slot_set # (1,2) models.Disk.objects.filter(server=server_object, slot__in=remove_slot_set).delete() if remove_slot_set: msg = "【删除硬盘】在%s槽位删除了硬盘。" % ",".join(remove_slot_set) record_msg_list.append(msg) # 要更新的槽位集合(可能有也可能没有) update_slot_set = new_disk_slot_set & db_disk_slot_set for slot in update_slot_set: temp = [] row_dict = disk_info[slot] # {'slot': '0', 'pd_type': 'SAS', 'capacity': '100', 'model': 'SEAGATE ST300MM0006 LS08S0K2B5NV'} row_object = db_disk_dict[slot] # row_object.slot/ row_object.pd_type = getattr(row_object,'pd_type') / row.capacity /row.model for key, value in row_dict.items(): if value != getattr(row_object, key): msg = "%s由%s变更为%s" % (key, getattr(row_object, key), value) temp.append(msg) setattr(row_object, key, value) if temp: slot_msg = "【更新硬盘】槽位%s:%s" % (slot, " ".join(temp)) record_msg_list.append(slot_msg) row_object.save() if record_msg_list: models.Record.objects.create(server=server_object, content=" ".join(record_msg_list))
3. client基于ssh远程资产采集
-
app.py -- 资产采集入口
from concurrent.futures import ThreadPoolExecutor from lib.plugins import get_server_info import settings import requests def ssh(hostname,command): import paramiko private_key = paramiko.RSAKey.from_private_key_file(settings.SSH_PRIVATE_KEY_PATH) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname=hostname, port=settings.SSH_PORT, username=settings.SSH_USER, pkey=private_key) stdin, stdout, stderr = ssh.exec_command(command) result = stdout.read() ssh.close() return result.decode('utf-8') def salt(hostname,command): import subprocess cmd = "salt '%s' cmd.run '%s' " %(hostname,command) data = subprocess.getoutput(cmd) return data def task(hostname): if settings.MODE == 'SSH': info = get_server_info(hostname, ssh) elif settings.MODE == 'SALT': info = get_server_info(hostname, salt) else: raise Exception('模式输入错误,请修改') requests.post( url="http://192.168.16.64:8000/api/v1/server/", json={'hostname':hostname,'info':info} ) def run(): response = requests.get(url="http://192.168.16.64:8000/api/v1/server/") host_list = response.json() pool = ThreadPoolExecutor(settings.THREAD_POOL_SIZE) for host in host_list: # 100服务器 pool.submit(task,host) if __name__ == '__main__': run()
-
plugins /
__init__.py
执行process_server_info函数,去采集相应的资产信息from settings import PLUGIN_DICT def get_server_info(hostname,ssh_func): """ :param hostname: 要远程操作的主机名 :param ssh_func: 执行远程操作的方法 :return: """ info_dict = {} for key, path in PLUGIN_DICT.items(): module_path, class_name = path.rsplit('.', maxsplit=1) # 1. 根据字符串的形式去导入模块 "lib.plugins.board" import importlib module = importlib.import_module(module_path) # 2.去模块找到类 cls = getattr(module, class_name) # 3. 对类型实例化 obj = cls() # 4. 执行对象的process方法 result = obj.process(hostname,ssh_func) info_dict[key] = result return info_dict
-
采集CPU信息示例
import traceback from lib.log import logger import settings from .base import BasePlugin class CPU(BasePlugin): def process(self,hostname,ssh_func): info = {'status': True, 'data': None, 'error': None} try: if settings.DEBUG: with open('files/cpuinfo.out',mode='r',encoding='utf-8') as f: content = f.read() else: content = ssh_func(hostname, 'cat /proc/cpuinfo') data = self.parse(content) info['data'] = data except Exception as e: # 记录日志 msg = traceback.format_exc() logger.log(msg) info['status'] = False info['error'] = msg return info def parse(self,content): """ 解析shell命令返回结果 :param content: shell 命令结果 :return:解析后的结果 """ response = {'cpu_count': 0, 'cpu_physical_count': 0, 'cpu_model': ''} cpu_physical_set = set() content = content.strip() for item in content.split(' '): for row_line in item.split(' '): value_list = row_line.split(':') if len(value_list) !=2: continue key,value = value_list key = key.strip() if key == 'processor': response['cpu_count'] += 1 elif key == 'physical id': cpu_physical_set.add(value) elif key == 'model name': if not response['cpu_model']: response['cpu_model'] = value response['cpu_physical_count'] = len(cpu_physical_set) return response
-
log.py -- 用来记录异常日志
import logging import settings class AutoLogger(object): def __init__(self,log_path,log_name): file_handler = logging.FileHandler(log_path, 'a', encoding='utf-8') fmt = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s") file_handler.setFormatter(fmt) self.logger = logging.Logger(log_name, level=logging.DEBUG) self.logger.addHandler(file_handler) def log(self,msg): self.logger.error(msg) logger = AutoLogger(settings.LOG_FILE_PATH,'cmdb')
总结:
- 对于今日未采集的服务器采用了基于Q实现复杂的SQL查询
- 对于变更处理采用集合的交集、差集,进而进行判断
- 中控机汇报到api的资产需要做入库以及变更记录的处理
- 由于资产搜集时是利用工厂模式实现可扩展插件,方便于扩展。在api端也是使用相同模式,对插件进行一一处理。
- 在处理资产信息时候,对操作进行交集和差集的处理从而得到删除/更新/新增资产。
- 在内部通过反射进行资产变更记录的获取,最终将资产以及变更记录写入数据库。