zoukankan      html  css  js  c++  java
  • CMDB03 /今日未采集的资产、资产入库、资产变更记录、资产采集

    CMDB03 /今日未采集的资产、资产入库、资产变更记录、资产采集

    1. 获取今日未采集的服务器

    • 代码示例:

          def get(self,request,*args,**kwargs):
              """ 返回今日未采集的服务器列表 """
              today = datetime.datetime.today()
              queryset = models.Server.objects.filter(status=1).filter(Q(last_date__isnull=True)|Q(last_date__lt=today)).values('hostname')
              host_list = [ item['hostname'] for item in queryset]
              print(host_list)
              return Response(host_list)
      

    2. server资产入库以及资产变更处理

    • post请求代码示例

      def post(self,request,*args,**kwargs):
          # 1. 获取到用户提交资产信息
          # 2. 保存到数据库(表关系)
          hostname = request.data.get('hostname')
          server_object = models.Server.objects.filter(hostname=hostname).first()
          if not server_object:
              return Response('主机不存在')
          process_server_info(request.data['info'],server_object)
      
          # 今日已经采集
          server_object.last_date = datetime.datetime.today()
          server_object.save()
      
          return Response('发送成功')
      
    • plugins /__init__.py 执行process_server_info函数,对接收到的消息做相应的处理

      代码示例:

      import importlib
      from django.conf import settings
      def process_server_info(info,server_object):
          """ 处理中控汇报资产信息 """
          for key,path in settings.CMDB_PLUGIN_DICT.items():
              module_path,class_name = path.rsplit('.',maxsplit=1)
              module = importlib.import_module(module_path)
              instance = getattr(module,class_name)()
              instance.process(info[key],server_object)
      
    • 判断做何处理/新增/删除/变更

      class Disk(object):
      
          def process(self,disk,server_object):
              if not disk['status']:
                  print('采集资产错误',disk['error'])
                  return
              disk_info = disk['data']
              new_disk_slot_set = set(disk_info.keys())
              # [obj,obj]
              db_disk_queryset = models.Disk.objects.filter(server=server_object)
              db_disk_dict = {obj.slot: obj for obj in db_disk_queryset}
              db_disk_slot_set = set(db_disk_dict.keys())
      
              record_msg_list = []
      
              # 新增的槽位集合
              create_slot_set = new_disk_slot_set - db_disk_slot_set
              create_object_list = []
              for slot in create_slot_set:
                  # models.Disk.objects.create(**disk_info[slot],server=server_object)
                  create_object_list.append(models.Disk(**disk_info[slot], server=server_object))
              if create_object_list:
                  models.Disk.objects.bulk_create(create_object_list, batch_size=10)
                  msg = "【新增硬盘】在%s槽位新增了硬盘。" % ",".join(create_slot_set)
                  record_msg_list.append(msg)
      
              # 要删除的槽位集合
              remove_slot_set = db_disk_slot_set - new_disk_slot_set  # (1,2)
              models.Disk.objects.filter(server=server_object, slot__in=remove_slot_set).delete()
              if remove_slot_set:
                  msg = "【删除硬盘】在%s槽位删除了硬盘。" % ",".join(remove_slot_set)
                  record_msg_list.append(msg)
      
              # 要更新的槽位集合(可能有也可能没有)
              update_slot_set = new_disk_slot_set & db_disk_slot_set
              for slot in update_slot_set:
                  temp = []
                  row_dict = disk_info[slot]  # {'slot': '0', 'pd_type': 'SAS', 'capacity': '100', 'model': 'SEAGATE ST300MM0006     LS08S0K2B5NV'}
                  row_object = db_disk_dict[slot]  # row_object.slot/ row_object.pd_type = getattr(row_object,'pd_type') / row.capacity /row.model
                  for key, value in row_dict.items():
                      if value != getattr(row_object, key):
                          msg = "%s由%s变更为%s" % (key, getattr(row_object, key), value)
                          temp.append(msg)
                          setattr(row_object, key, value)
                  if temp:
                      slot_msg = "【更新硬盘】槽位%s:%s" % (slot, " ".join(temp))
                      record_msg_list.append(slot_msg)
                      row_object.save()
      
              if record_msg_list:
                  models.Record.objects.create(server=server_object, content="
      ".join(record_msg_list))
      

    3. client基于ssh远程资产采集

    • app.py -- 资产采集入口

      from concurrent.futures import ThreadPoolExecutor
      from lib.plugins import get_server_info
      import settings
      import requests
      
      
      def ssh(hostname,command):
          import paramiko
          private_key = paramiko.RSAKey.from_private_key_file(settings.SSH_PRIVATE_KEY_PATH)
          ssh = paramiko.SSHClient()
          ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
          ssh.connect(hostname=hostname, port=settings.SSH_PORT, username=settings.SSH_USER, pkey=private_key)
          stdin, stdout, stderr = ssh.exec_command(command)
          result = stdout.read()
          ssh.close()
          return result.decode('utf-8')
      
      
      def salt(hostname,command):
          import subprocess
          cmd = "salt '%s' cmd.run '%s' " %(hostname,command)
          data = subprocess.getoutput(cmd)
          return data
      
      
      def task(hostname):
      
          if settings.MODE == 'SSH':
              info = get_server_info(hostname, ssh)
          elif settings.MODE == 'SALT':
              info = get_server_info(hostname, salt)
          else:
              raise Exception('模式输入错误,请修改')
      
          requests.post(
              url="http://192.168.16.64:8000/api/v1/server/",
              json={'hostname':hostname,'info':info}
          )
      
      def run():
          response = requests.get(url="http://192.168.16.64:8000/api/v1/server/")
          host_list = response.json()
          pool = ThreadPoolExecutor(settings.THREAD_POOL_SIZE)
          for host in host_list: # 100服务器
              pool.submit(task,host)
      
      if __name__ == '__main__':
          run()
      
    • plugins /__init__.py 执行process_server_info函数,去采集相应的资产信息

      from settings import PLUGIN_DICT
      
      def get_server_info(hostname,ssh_func):
          """
      
          :param hostname: 要远程操作的主机名
          :param ssh_func: 执行远程操作的方法
          :return:
          """
          info_dict = {}
      
          for key, path in PLUGIN_DICT.items():
              module_path, class_name = path.rsplit('.', maxsplit=1)
      
              # 1. 根据字符串的形式去导入模块 "lib.plugins.board"
              import importlib
              module = importlib.import_module(module_path)
      
              # 2.去模块找到类
              cls = getattr(module, class_name)
      
              # 3. 对类型实例化
              obj = cls()
      
              # 4. 执行对象的process方法
              result = obj.process(hostname,ssh_func)
      
              info_dict[key] = result
      
          return info_dict
      
    • 采集CPU信息示例

      import traceback
      from lib.log import logger
      import settings
      
      from .base import BasePlugin
      
      class CPU(BasePlugin):
          def process(self,hostname,ssh_func):
              info = {'status': True, 'data': None, 'error': None}
              try:
                  if settings.DEBUG:
                      with open('files/cpuinfo.out',mode='r',encoding='utf-8') as f:
                          content = f.read()
                  else:
                      content = ssh_func(hostname, 'cat /proc/cpuinfo')
                  data = self.parse(content)
                  info['data'] = data
              except Exception as e:
                  # 记录日志
                  msg = traceback.format_exc()
                  logger.log(msg)
                  info['status'] = False
                  info['error'] = msg
              return info
      
          def parse(self,content):
              """
              解析shell命令返回结果
              :param content: shell 命令结果
              :return:解析后的结果
              """
              response = {'cpu_count': 0, 'cpu_physical_count': 0, 'cpu_model': ''}
      
              cpu_physical_set = set()
      
              content = content.strip()
              for item in content.split('
      
      '):
                  for row_line in item.split('
      '):
                      value_list = row_line.split(':')
                      if len(value_list) !=2:
                          continue
                      key,value = value_list
                      key = key.strip()
                      if key == 'processor':
                          response['cpu_count'] += 1
                      elif key == 'physical id':
                          cpu_physical_set.add(value)
                      elif key == 'model name':
                          if not response['cpu_model']:
                              response['cpu_model'] = value
              response['cpu_physical_count'] = len(cpu_physical_set)
      
              return response
      
    • log.py -- 用来记录异常日志

      import logging
      import settings
      
      class AutoLogger(object):
          def __init__(self,log_path,log_name):
              file_handler = logging.FileHandler(log_path, 'a', encoding='utf-8')
              fmt = logging.Formatter(fmt="%(asctime)s - %(name)s - %(levelname)s -%(module)s:  %(message)s")
              file_handler.setFormatter(fmt)
      
              self.logger = logging.Logger(log_name, level=logging.DEBUG)
              self.logger.addHandler(file_handler)
      
          def log(self,msg):
              self.logger.error(msg)
      
      
      logger = AutoLogger(settings.LOG_FILE_PATH,'cmdb')
      

    总结:

    1. 对于今日未采集的服务器采用了基于Q实现复杂的SQL查询
    2. 对于变更处理采用集合的交集、差集,进而进行判断
    3. 中控机汇报到api的资产需要做入库以及变更记录的处理
      • 由于资产搜集时是利用工厂模式实现可扩展插件,方便于扩展。在api端也是使用相同模式,对插件进行一一处理。
      • 在处理资产信息时候,对操作进行交集和差集的处理从而得到删除/更新/新增资产。
      • 在内部通过反射进行资产变更记录的获取,最终将资产以及变更记录写入数据库。
  • 相关阅读:
    windows使用pipenv创建虚拟环境报错UnicodeDecodeError: 'utf-8' codec can't decode byte 0xce in position 4: in...
    mysql监控工具sqlprofiler,类似sqlserver的profiler工具安装(一)
    [转]linux awk命令详解
    navicat for mysql 如何设置字段唯一
    linux硬链接与软链接
    linux后台运行和关闭、查看后台任务
    测试覆盖率的基本策略
    【Unity Shader】二、顶点函数(vertex)和片元函数(fragment)传递数据,及各阶段可使用的语义(semantic)
    【Unity Shader】一、顶点函数(vertex)和片元函数(fragment)
    Unity Shader学习资料
  • 原文地址:https://www.cnblogs.com/liubing8/p/11892721.html
Copyright © 2011-2022 走看看