1.命令插件异常处理 + 日志采集
1)为了更清楚发送客户端收集信息的状态 ,优化返回api的数据 ,变为字典存储 {状态 错误信息 数据} ,因为每个插件的每种系统下都要这个返回值我们将他单独做成类
/lib/BaseReponse.py
class BaseReponese: def __init__(self): self.status = True self.error = None self.data = None @property def dict(self): return self.__dict__
2)在执行命令的时候 ,debug模式下有打开文件的操作 ,打开文件可能出现目录不存在而导致程序终止 ,且这个错误信息仅能在服务端看到 ,这是不合理的所以同时引用日志
/lib/log.py 日志做一个单例对象到处使用导入使用即可
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import logging from conf import settings class Logger: def __init__(self, file_name, log_name): file_handler = logging.FileHandler(file_name, encoding='utf-8') fmt = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s -%(module)s: %(message)s") file_handler.setFormatter(fmt) self.logger = logging.Logger(log_name) self.logger.addHandler(file_handler) def info(self, msg): return self.logger.info(msg) def error(self, msg): return self.logger.error(msg) logger = Logger(settings.LOGGER_PATH, settings.LOGGER_NAME)
所有的插件的linux'与win模式下都肯能出现这个问题 ,所以所有的都要加错误处理 + 客户端日志
traceback可以直接捕获发生异常的文件 ,以及哪行出错
import traceback
from lib.BaseReponse import BaseResponese
def linux or win(self, handler, hostname=None): response = BaseReponese() try: if self.debug: with open(os.path.join(self.base_dir, 'files1', 'memory.out')) as f: ret = f.read() else: ret = handler.cmd('lsblk', hostname) response.data = self.parse(ret) except Exception as e: logger.error(e) response.error = traceback.format_exc() response.status = False return response.dict
2.完善资产采集插件
增加cpu采集 ,主板信息采集 ,基本信息采集
##settings.py
PLUGINS_DICT = { 'disk': 'src.plugins.disk.Disk', 'memory': 'src.plugins.memory.Memory', 'nic': 'src.plugins.nic.NIC', 'basic': 'src.plugins.basic.Basic', 'cpu': 'src.plugins.cpu.Cpu', 'board': 'src.plugins.main_board.MainBoard', }
cpu命令插件代码
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/usr/bin/env python # -*- coding:utf-8 -*- import os import traceback from .base import BasePlugin from lib.BaseReponse import BaseReponese from lib.log import logger from conf.settings import BASE_DIR class Cpu(BasePlugin): def win(self, handler, hostname=None): raise NotImplementedError('win must be implemented ') def linux(self, handler, hostname=None): response = BaseReponese() try: if self.debug: output = open(os.path.join(BASE_DIR, 'files', 'cpuinfo.out'), 'r').read() else: shell_command = "cat /proc/cpuinfo" output = handler.cmd(shell_command, hostname) response.data = self.parse(output) except Exception as e: msg = traceback.format_exc() response.status = False response.error = msg logger.error(msg) return response.dict @staticmethod def parse(content): """ 解析shell命令返回结果 :param content: shell 命令结果 :return:解析后的结果 """ response = {'cpu_count': 0, 'cpu_physical_count': 0, 'cpu_model': ''} cpu_physical_set = set() content = content.strip() for item in content.split(' '): for row_line in item.split(' '): key, value = row_line.split(':') key = key.strip() if key == 'processor': response['cpu_count'] += 1 elif key == 'physical id': cpu_physical_set.add(value) elif key == 'model name': if not response['cpu_model']: response['cpu_model'] = value response['cpu_physical_count'] = len(cpu_physical_set) return response
basic命令插件代码
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/usr/bin/env python # -*- coding:utf-8 -*- import traceback from .base import BasePlugin from lib.response import BaseReponese from lib.log import logger class Basic(BasePlugin): def os_platform(self, handler, hostname): """ 获取系统平台 :return: """ output = handler.cmd('uname', hostname) return output.strip() def os_version(self, handler, hostname): """ 获取系统版本 :return: """ output = handler.cmd('cat /etc/issue', hostname) result = output.strip().split(' ')[0] return result def os_hostname(self, handler, hostname): """ 获取主机名 :return: """ output = handler.cmd('hostname', hostname) return output.strip() def win(self, handler, hostname=None): raise NotImplementedError('win must be implemented ') def linux(self, handler, hostname=None): response = BaseReponese() try: if self.debug: ret = { 'os_platform': 'linux', 'os_version': '6.5', 'hostname': 'c2.com' } else: ret = { 'os_platform': self.os_platform(handler, hostname), 'os_version': self.os_version(handler, hostname), 'hostname': self.os_hostname(handler, hostname), } response.data = ret except Exception as e: msg = traceback.format_exc() response.status = False response.error = msg logger.error(msg) return response.dict
board命令插件代码
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
#!/usr/bin/env python # -*- coding:utf-8 -*- import os import traceback from .base import BasePlugin from lib.BaseReponse import BaseReponese from lib.log import logger from conf.settings import BASE_DIR class MainBoard(BasePlugin): def win(self, handler, hostname=None): raise NotImplementedError('win must be implemented ') def linux(self, handler, hostname=None): response = BaseReponese() try: if self.debug: output = open(os.path.join(BASE_DIR, 'files', 'board.out'), 'r').read() else: shell_command = "sudo dmidecode -t1" output = handler.cmd(shell_command, hostname) response.data = self.parse(output) except Exception as e: msg = traceback.format_exc() response.status = False response.error = msg logger.error(msg) return response.dict def parse(self, content): result = {} key_map = { 'Manufacturer': 'manufacturer', 'Product Name': 'model', 'Serial Number': 'sn', } for item in content.split(' '): row_data = item.strip().split(':') if len(row_data) == 2: if row_data[0] in key_map: result[key_map[row_data[0]]] = row_data[1].strip() if row_data[1] else row_data[1] return result
3.完善资产上报
1)需要考虑的问题 : 资产上报如何确定该这是对应的资源 ? 本次提交数据是新增还是修改呢 ?
主机唯一标识使用主机名 ,主机名存在系统的一个文件A中 ,每次提交数据的时候客户端比对一下就能完美解决唯一标识的问题 (唯一标识仅agent模式需要 ,ssh的模式中都是从api获取主机列表)
客户端提交数据的操作 ,我们可以在上报的字典中加一个状态
情况1:如果没有文件A ,新增主机
情况2:如果有文件A 且文件中记录主机名与提交主机名一致 ,唯一标识未被修改修改数据
情况3:如果有文件A 且文件中记录主机名与提交主机名不一致 ,唯一标识被修改的修改数据
服务端收到字典
针对情况1:新增记录到数据库
针对情况2:根据主机名找到资源更新该资源信息
针对情况3:根据文件A中的主机名找到字段更新信息 ,将返回唯一标识返回给客户端 ,客户端改动文件A
2)客户端情况进行代码实现
考虑三种情况 + 保存唯一标识文件(文件位置由配置定义)
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from src.engine.base import BaseHandler from ..plugins import get_server_info import requests import json import os from conf import settings class AgentHandler(BaseHandler): """定义cmd窗口 ,操控资产采集 + 上报""" def cmd(self, command, hostname=None): import subprocess return subprocess.getoutput(command) def handler(self): info = get_server_info(self) # 新增 if not os.path.exists(settings.CERT_PATH): info['type'] = 'create' else: with open(settings.CERT_PATH, 'r', encoding='utf-8') as f1: cert_hostname = f1.read() hostname = info['basic']['data']['hostname'] # 修改 if cert_hostname == hostname: info['type'] = 'update' # 主机名被修改 else: info['type'] = 'host_change' info['cert_hostname'] = cert_hostname r1 = requests.post( url='http://127.0.0.1:8000/api/asset/', data=json.dumps(info).encode('gbk'), headers={ 'content-type': 'application/json' } ) response_data = r1.json() with open(settings.CERT_PATH,'w',encoding='utf-8') as f1: f1.write(response_data['hostname'])
3)api服务端根据三种情况 ,使用django提供数据增删改查
建立models表结构 ,数据迁移
外键遵循多对一 :IDC对于Server来说是一 ,那么Server设置外键字段 ,Server对于内存网卡来说是一 ....
from django.db import models # 业务线 class BusinessUnit(models.Model): """ 业务名 """ # 机房信息 class IDC(models.Model): """ 机房名 楼层 """ # 服务器信息 class Server(models.Model): """ 服务器状态 IDC外键 机柜号 机柜序号 业务线外键 主机名 系统 系统版本 SN 厂商 型号 CPU个数 CPU物理个数 CPU型号 最后一次更新日期时间 创建时间 """ # 硬盘信息 class Disk(models.Model): """ 硬盘所在槽位 硬盘型号 硬盘容量 硬盘类型 服务器外键 """ # 网卡信息 class NIC(models.Model): """ 网卡名 网卡mac地址 网卡的掩码 网卡的ip 启用状态 服务器外键 """ # 内存信息 class Memory(models.Model): """ 内存所在槽位 内存型号 内存容量 内存厂商 SN 内存读写速率 服务器外键 """ # 变更审计 class AssetRecord(models.Model): """ 审计内容 服务器外键 创建时间 """ # 错误日志 class ErrorLog(models.Model): """ 日志内容 日志标题 创建时间 服务器外键信息 """
4)API处理3种情况的方式
create完全新增 ,update与host_change两种情况是重合的 ,host_change需要先变更一下主机名而已 ,修改部分的重复代码放入service.py
service.py
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from api import models def process_basic(info, server_list): server_info = {} server_info.update(info['basic']['data']) server_info.update(info['cpu']['data']) server_info.update(info['board']['data']) # 将新字典内容打散用于更新数据库数据 server_list.update(**server_info) def process_disk(info, server): disk_info = info['disk']['data'] disk_query = models.Disk.objects.filter(server=server) disk_info_set = set(disk_info) # 针对字典的key,也就是slot disk_query_set = {str(i.slot) for i in disk_query} # 针对对象的slot属性 add_slot_set = disk_info_set - disk_query_set # 差集(新增硬盘槽位) del_slot_set = disk_query_set - disk_info_set # 差集(移除的硬盘槽位) update_slot_set = disk_info_set & disk_query_set # 交集(槽位硬盘更新) # add_slot_set disk_list = [] for slot in add_slot_set: disk_info[slot]['server'] = server disk_list.append(**disk_list[slot]) if disk_list: models.Disk.objects.bulk_create(disk_list) # del_slot_set(拿出所有被删除的对象列表) if del_slot_set: models.Disk.objects.filter(server=server, slot__in=del_slot_set).delete() # update_slot_set if update_slot_set: for slot in update_slot_set: models.Disk.objects.filter(server=server, slot=slot).update(**disk_info[slot]) def process_memory(info,server): memory_info = info['memory']['data'] memory_query = models.Memory.objects.filter(server=server) memory_info_set = set(memory_info) # 针对字典的key,也就是slot memory_query_set = {str(i.slot) for i in memory_query} # 针对对象的slot属性 add_slot_set = memory_info_set - memory_query_set # 差集(新增内存槽位) del_slot_set = memory_query_set - memory_info_set # 差集(移除的内存槽位) update_slot_set = memory_info_set & memory_query_set # 交集(槽位内存更新) # add_slot_set memory_list = [] for slot in add_slot_set: memory_info[slot]['server'] = server memory_list.append(**memory_list[slot]) if memory_list: models.Memory.objects.bulk_create(memory_list) # del_slot_set(拿出所有被删除的对象列表) if del_slot_set: models.Memory.objects.filter(server=server, slot__in=del_slot_set).delete() # update_slot_set if update_slot_set: for slot in update_slot_set: models.Memory.objects.filter(server=server, slot=slot).update(**memory_info[slot]) def process_nic(info,server): nic_info = info['nic']['data'] nic_query = models.NIC.objects.filter(server=server) nic_info_set = set(nic_info) nic_query_set = {str(i.name) for i in nic_query} create_name_set = nic_info_set - nic_query_set del_name_set = nic_query_set - nic_info_set update_name_set = nic_info_set & nic_query_set if create_name_set: nic_list = [] for name in create_name_set: nic_info[name]['server'] = server nic_info[name]['name'] = name nic_list.append(models.NIC(**nic_info[name])) models.NIC.objects.bulk_create(nic_list) if del_name_set: models.NIC.objects.filter(server=server, name__in=del_name_set).delete() if update_name_set: for name in update_name_set: models.NIC.objects.filter(server=server, name=name).update(**nic_info[name])
view.py
数据打散时注意一下model的字段名要与info中的key一致
在对硬盘网卡内存等一种多资源的操作 ,需要先判断新增多少 ,删除多少 ,变更多少三种情况再具体操作
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
from django.shortcuts import render, HttpResponse from rest_framework.views import APIView from rest_framework.response import Response from api import models from api import service class Asset(APIView): def get(self, request): server_info = ['master1', 'master2'] return Response(server_info) def post(self, request): info = request.data hostname = info['basic']['data']['hostname'] ret = {'status': True, 'hostname': hostname} action_type = info.get('type') print(action_type) # 完全新增主机 if action_type == 'create': # 1.新增主机记录 将info中有关Server表字段的字典数据拿出来放入server_info中 server_info = {} server_info.update(info['basic']['data']) server_info.update(info['cpu']['data']) server_info.update(info['board']['data']) # 创建Server记录 ,保存在数据库 ,并拿到该对象 ,其他外键与之关联 server = models.Server.objects.create(**server_info) # 2.新增该主机的硬盘信息 disk_list = [] for i in info['disk']['data'].values(): i['server'] = server disk_list.append(models.Disk(**i)) models.Disk.objects.bulk_create(disk_list) # 3.新增该主机内存信息 memory_list = [] for i in info['memory']['data'].values(): i['server'] = server memory_list.append(models.Memory(**i)) models.Memory.objects.bulk_create(memory_list) # 4.新增该主机的网卡信息 nic_list = [] for n, m in info['nic']['data'].items(): m['server'] = server m['name'] = n nic_list.append(models.NIC(**m)) models.NIC.objects.bulk_create(nic_list) # 唯一标识不变,更新数据 elif action_type == 'update': # 取出旧的信息 server_list = models.Server.objects.filter(hostname=hostname) server = server_list.first() ##1. 修改主机信息 service.process_basic(info, server_list) ##2. 修改硬盘信息 service.process_disk(info, server) ##3. 内存信息变更 service.process_memory(info, server) ##4. 网卡信息变更 service.process_nic(info, server) # 唯一标识变更的情况下 ,更新数据 elif action_type == 'host_change': print('host_change') cert = info.get('cert_hostname') server = models.Server.objects.filter(hostname=cert).first() server.hostname = info['basic']['data']['hostname'] server.save() server_list = models.Server.objects.filter(hostname=server.hostname) ##1 修改主机信息 service.process_basic(info,server_list) ##2 修改硬盘信息 service.process_disk(info,server) ##3 修改内存信息 service.process_memory(info,server) ##4 修改网卡信息 service.process_nic(info,server) return Response(ret)
4.api的验证
客户端必须拥有key才能去调用服务端api ,两端存储同一个key值不在网络中传输
原理: 客户端发送 MD5=md5(client的key+A时间) + A时间
服务端接收 MD5 + A时间
服务端使用 md5(server的key+A时间) 是否等于 客户端传来的MD5 ,由此判断两边存储的key值是否相等
合理使用: 客户端的post请求带上MD5与时间的参数 ,服务端所有api都要使用到这个校验 ,重写dispatch方法来完成
客户端代码
/lib/api_auth.py 所有的engine都要使用所以放入lib中
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import hashlib from conf import settings def MD5_key(ctime): key = '{}|{}'.format(settings.AUTH_KEY, ctime) md5 = hashlib.md5() md5.update(key.encode('utf-8')) return md5.hexdigest()
/engine/agent.py engine中使用的方式
其中params可以让url上带参数 ,服务端通过request.GET.get('key')获取
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
ctime = time.time() r1 = requests.post( url='http://127.0.0.1:8000/api/asset/', data=json.dumps(info).encode('gbk'), params={'key': MD5_key(ctime), 'ctime': ctime}, headers={ 'content-type': 'application/json' } )
服务端代码
views.py
1.定义一个类 ,重写dispatch方法进行校验 ,让所有api都继承这个类
2.校验中考虑超时问题拒绝 ,重复伪造的url拒绝 ,md5值无法对应的拒绝
3.如果失败直接返回json ,如果成功就走父类的dispatch ,执行api流程
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
def MD5_key(ctime): key = '{}|{}'.format(settings.AUTH_KEY, ctime) md5 = hashlib.md5() md5.update(key.encode('utf-8')) return md5.hexdigest() already_key = {} class AuthView(APIView): def dispatch(self, request, *args, **kwargs): ret = {'status': True, 'msg': 'ok'} key = request.GET.get('key') ctime = request.GET.get('ctime') now = time.time() if now - float(ctime) > 3: ret['status'] = False ret['msg'] = '请求超时' return JsonResponse(ret) if key in already_key: ret['status'] = False ret['msg'] = 'url已使用' return JsonResponse(ret) if key != MD5_key(ctime): ret['status'] = False ret['msg'] = '验证失败' already_key[key] = None if ret['status']: return super().dispatch(request, *args, **kwargs) else: return JsonResponse(ret)
5.数据传输加密
提高安全性 ,客户端服务端传输的数据使用非对称加密
1)生成秘钥对
生成最多加密1024位 ,那就是128个字节 ,其中有11个是加密自己用 ,所以每次能加密最大长度为117
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import rsa import base64 ####生成一组秘钥对#### pub_key_obj, priv_key_obj = rsa.newkeys(1024) # 最大加密value的字节数是 1024/8 - 11(自己用) = 117(可用加密字节) # 将公私钥的str类型拿出 pub_key_str = pub_key_obj.save_pkcs1() priv_key_str = priv_key_obj.save_pkcs1() # 再将str的类型用base64编码 pub_key_code = base64.standard_b64encode(pub_key_str) priv_key_code = base64.standard_b64encode(priv_key_str) print(pub_key_code) print(priv_key_code)
2)客户端加密
使用配置好的公钥 ,对数据进行分段加密再拼接在一起 ,发送到服务端
数据编码加密格式化顺序 : 加密(编码(json(原始数据)))
lib/encrypt.py 将加密函数单独放入公共功能文件夹
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import base64 import rsa from conf import settings def encrypt(bytes_value): values_list = [] key_str = base64.standard_b64decode(settings.PUB_KEY) pk = rsa.PublicKey.load_pkcs1(key_str) length = len(bytes_value) for i in range(0, length, 117): val = rsa.encrypt(bytes_value[i: i + 117], pk) values_list.append(val) val = b''.join(values_list) return val
所有的engine中的handler方法数据上报都加密
r1 = requests.post( ... data=encrypt(json.dumps(info).encode('utf-8')), ... } )
3)服务端解密
当post请求中数据过长 ,request的数据放在body中
数据编码解密格式化顺序 : json(解码(解密(原始数据) ))
utils.py/decrypt.py 将数据解码文件函数放入工具文件夹
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
import base64 import rsa from django.conf import settings def decrypt(value): values_list = [] key_str = base64.standard_b64decode(settings.PRI_KEY) pk = rsa.PrivateKey.load_pkcs1(key_str) length = len(value) for i in range(0, length, 128): val = rsa.decrypt(value[i:i + 128], pk) values_list.append(val) val = b''.join(values_list) return val
视图函数中接收的info数据进行解密
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
def post(self, request): info = json.loads(decrypt(request.body).decode('utf-8'))