zoukankan      html  css  js  c++  java
  • python 网络监控主机

    import subprocess
    import threading
    import time
    from queue import Queue
    from queue import Empty
    
    def wrapper(func):
        def inner(*args,**kwargs):
            s1 = time.time()
            func(*args,**kwargs)
            s2 = time.time()
            s3 = s2 - s1
            print(s3)
        return inner
    
    
    def run_ping(ip):
        proc = subprocess.run('ping -n 1 -w 2 {}'.format(ip),
                              shell=True,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              )
        return_code = proc.returncode
        # print(return_code)
        if return_code == 0:
            print('{} 网络良好!'.format(ip))
        else:
            print('{} 网络故障!'.format(ip))
    
    
    def is_reacheable(q):
        try:
            while True:
                ip = q.get_nowait()
                run_ping(ip)
        except Empty:
            pass
    
    @wrapper
    def main():
        q = Queue()
        with open('ip.txt') as f:
            for line in f:
                q.put(line)
        threads = []
        for i in range(10):
            thr = threading.Thread(target=is_reacheable, args=(q,))
            thr.start()
            threads.append(thr)
        for thr in threads:
            thr.join()
    
    
    if __name__ == '__main__':
        main()
        # run_ping('12.0.0.11')

     改进版

    import subprocess
    import time
    import threading
    import json
    from functools import wraps
    from queue import Queue, Empty
    
    THREAD_POOL_SIZE = 18
    
    
    def create_ip():
        with open('ip.txt', 'a+', encoding='utf-8') as f:
            for i in range(20):
                f.write('127.0.0.{}
    '.format(i))
    
    
    def wrapper(func):
        @wraps(func)
        def _timmer(*args, **kwargs):
            start_time = time.time()
            func(*args, **kwargs)
            finish_time = time.time() - start_time
            print('共耗时{:.2f}秒'.format(finish_time))
        return _timmer
    
    
    def run_ping(ip):
        ret_dic = {}
        result = subprocess.run('ping -n 1 -w 1 {}'.format(ip),
                                shell=True,
                                stdout=subprocess.DEVNULL)
        result_code = result.returncode
        ret_dic[ip] = result_code
        return json.dumps(ret_dic)
    
    
    def print_ping(ret):
        ret_dic = json.loads(ret)
        for ip, ret_code in ret_dic.items():
            if ret_code != 0:
                with open('result.txt', 'a+', encoding='utf-8') as f:
                    f.write('{} 网络不通 {}
    '.format(
                        ip,
                        time.strftime('%Y-%m-%d %X')
                    ))
                print('{} is offline'.format(ip))
    
    
    def worker(work_queue, result_queue):
        while True:
            try:
                ip = work_queue.get(block=False)
            except Empty:
                break
            else:
                try:
                    result = run_ping(ip)
                except Exception as e:
                    result_queue.put(e)
                else:
                    result_queue.put(result)
                finally:
                    work_queue.task_done()
    
    
    @wrapper
    def main():
        print('正在网络检查,请稍候......')
        work_queue = Queue()
        result_queue = Queue()
        with open('ip.txt') as f:
            for ip in f:
                ip = ip.strip()
                work_queue.put(ip)
        threads = [
            threading.Thread(target=worker, args=(work_queue, result_queue))
            for _ in range(THREAD_POOL_SIZE)
        ]
        for thread in threads:
            thread.start()
        work_queue.join()
        while threads:
            threads.pop().join()
        while not result_queue.empty():
            result = result_queue.get()
            if isinstance(result, Exception):
                raise result
            print_ping(result)
        print('检查完毕!')
    
    
    if __name__ == '__main__':
        # create_ip()
        main()
  • 相关阅读:
    IPAD3终于发布了!苹果在5年内还是很难被超越!
    今天更新了ubuntu11.10!
    折腾两日系统重装,对比ubuntu11.04 和windows 7旗舰版!(不定时更新添加新的体验)
    DELL XPS M1530安装MAC OS X Lion 10.7.3经验分享!
    ubuntu 11.04 指纹识别的安装!
    Dlink DIR615L 和水星(mercury) MW300R桥接方法!
    POJ 3522 Slim Span(kruskal 变型)
    POJ 3621 Sightseeing Cows(SPFA + 构造负环)
    POJ 2553 The Bottom of a Graph(Tarjan)
    POJ 2728 Desert King(最优比率生成树)
  • 原文地址:https://www.cnblogs.com/superniao/p/10587866.html
Copyright © 2011-2022 走看看