zoukankan      html  css  js  c++  java
  • 115.python获取服务器信息

    1.产生原因

    项目需求获取服务器一段时间之内的cpu使用率,内存使用率等信息.由于数据会被多人访问,所以采用python元类的方式创建单例模式,每个请求访问的都是同一个对象.同时通过linux的sar命令获取相关信息,加载在内存中,其中我限制了返回数据的列表长度200(自己调整就行了),同时这个东西其实在写入数据的时候最好使用异步任务celery获者python apscheduler模块.
    

    2.代码实现

    import os, re, datetime
    import threading
    
    
    class SingletonType(type):
        _instance_lock = threading.Lock()
    
        def __init__(self, *args, **kwargs):
            super(SingletonType, self).__init__(*args, **kwargs)
    
        def __call__(cls, *args, **kwargs):
            if not hasattr(cls, "_instance"):
                with cls._instance_lock:
                    if not hasattr(cls, "_instance"):
                        cls._instance = super(SingletonType, cls).__call__(*args, **kwargs)
            return cls._instance
    
    
    class ServerInfo(metaclass=SingletonType):
    
        def __init__(self):
            self.time_list = []
            self.dick_read_list = []
            self.dick_write_list = []
            self.net_read_list = []
            self.net_write_list = []
            self.cpu_use_list = []
            self.mem_use_list = []
    
        def get_data(self):
            self.set_server_info()
            data = self.get_server_info()
            return data
    
        def get_server_info(self):
            self.check_list_length()
            data = {"cpu_use": self.cpu_use_list,
                    "mem_use": self.mem_use_list,
                    "disk_read": self.dick_read_list,
                    "dick_write": self.dick_write_list,
                    "net_read": self.net_read_list,
                    "net_write": self.net_write_list,
                    "time": self.time_list}
            return data
    
        def set_server_info(self):
            # 磁盘io情况
            rev = os.popen("sar -b 1 1").readlines()
            i_rev = re.findall('(S+)', rev[-1])[-3]
            o_rev = re.findall('(S+)', rev[-1])[-4]
            self.dick_read_list.insert(0, o_rev)
            self.dick_write_list.insert(0, i_rev)
    
            # 网络io情况
            i_net = 0
            o_net = 0
            nets = os.popen("sar -n DEV 1 1").readlines()
            for net in nets:
                if re.search('lo', net) or re.search('IFACE', net):
                    pass
                elif re.search('Average', net):
                    i_net += float(re.findall('(S+)', net)[-5])
                    o_net += float(re.findall('(S+)', net)[-4])
    
            self.net_read_list.insert(0, o_net)
            self.net_write_list.insert(0, i_net)
    
            # cpu使用情况
            cpu = os.popen("sar -u 1 1").readlines()
            cpu_use = 100 - float(re.findall('(S+)', cpu[-1])[-1])
            self.cpu_use_list.insert(0, cpu_use)
    
            # 内存使用情况
            mem = os.popen('LANG=en_US.UTF-8;sar -r 1 1').readlines()
            data = re.findall('(S+)', mem[-1])[3]
    
            self.mem_use_list.insert(0, data)
    
            # 时间信息
            t1 = datetime.datetime.now()
            now_time = t1.strftime("%Y-%m-%d %H:%M:%S")
            self.time_list.insert(0, now_time)
    
        def slice_list(self, target_list: list):
            if len(target_list) > 200:
                target_list = target_list[0:200]
            return target_list
    
        def check_list_length(self):
            self.slice_list(self.time_list)
            self.slice_list(self.dick_read_list)
            self.slice_list(self.dick_write_list)
            self.slice_list(self.net_read_list)
            self.slice_list(self.net_write_list)
            self.slice_list(self.cpu_use_list)
            self.slice_list(self.mem_use_list)
    
    
    if __name__ == '__main__':
        obj1 = ServerInfo()
        obj2 = ServerInfo()
        # print(obj1 is obj2)
        # thread_list = []
        # for i in range(10):
        #     obj1 = ServerInfo()
        #     t = threading.Thread(target=obj1.set_server_info)
        #     thread_list.append(t)
        #
        # t = None
        # for t in thread_list:
        #     time.sleep(1)
        #     t.start()
        # t.join()
        # obj2 = obj1 = ServerInfo()
        # print(obj2.get_server_info())
    
  • 相关阅读:
    QT资料大全
    网络协议及tcp协议详解
    QT和Java的跨平台
    QString转char *
    QT删除整个文件夹
    QT获取linux下的当前用户名
    std::map自定义类型key
    QT程序自启动
    linux下通过命令连接wifi
    Rsync实现文件的同步
  • 原文地址:https://www.cnblogs.com/liuzhanghao/p/14085368.html
Copyright © 2011-2022 走看看