zoukankan      html  css  js  c++  java
  • python 批量ping服务器

    最近在https://pypi.python.org/pypi/mping/0.1.2找到了一个python包,可以用它来批量ping服务器,它是中国的大神写的,支持单个服务器、将服务器IP写在txt或json里都可以。

    具体用法有中文文档,感谢作者

    这里我改了几个字,方便我这种英文不好的同学使用

    mping.py

    #!/usr/bin/env python3
    # coding: utf-8
    
    import argparse
    import ctypes
    import json
    import os
    import random
    import re
    import select
    import socket
    import struct
    import sys
    import threading
    import time
    
    if sys.platform.startswith('win32'):
        clock = time.clock
        run_as_root = ctypes.windll.shell32.IsUserAnAdmin() != 0
    else:
        clock = time.time
        run_as_root = os.getuid() == 0
    
    DEFAULT_DURATION = 3
    
    EXIT_CODE_BY_USER = 1
    EXIT_CODE_DNS_ERR = 2
    EXIT_CODE_IO_ERR = 3
    
    
    # Credit: https://gist.github.com/pyos/10980172
    def chk(data):
        x = sum(x << 8 if i % 2 else x for i, x in enumerate(data)) & 0xFFFFFFFF
        x = (x >> 16) + (x & 0xFFFF)
        x = (x >> 16) + (x & 0xFFFF)
        return struct.pack('<H', ~x & 0xFFFF)
    
    
    # From the same gist commented above, with minor modified.
    def ping(addr, timeout=1, udp=not run_as_root, number=1, data=b''):
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM if udp else socket.SOCK_RAW, socket.IPPROTO_ICMP) as conn:
            payload = struct.pack('!HH', random.randrange(0, 65536), number) + data
    
            conn.connect((addr, 80))
            conn.sendall(b'x08' + chk(b'x08' + payload) + payload)
            start = clock()
    
            while select.select([conn], [], [], max(0, start + timeout - clock()))[0]:
                data = conn.recv(65536)
                if data[20:] == b'' + chk(b'' + payload) + payload:
                    return clock() - start
    
    
    class PingResults(list):
        def __init__(self, multiply=1000):
            """
            A list to save ping results, and can be used to count min/avg/max, etc.
            :param multiply: Every valid result will be multiplied by this number.
            """
            super(PingResults, self).__init__()
            self.multiple = multiply
    
        def append(self, rtt):
            """
            To receive a ping result, accept a number for how long the single ping took, or None for timeout.
            :param rtt: The ping round-trip time.
            """
            if rtt is not None and self.multiple:
                rtt *= self.multiple
            return super(PingResults, self).append(rtt)
    
        @property
        def valid_results(self):
            return list(filter(lambda x: x is not None, self))
    
        @property
        def valid_count(self):
            return len(self.valid_results)
    
        @property
        def loss_rate(self):
            if self:
                return 1 - len(self.valid_results) / len(self)
    
        @property
        def min(self):
            if self.valid_results:
                return min(self.valid_results)
    
        @property
        def avg(self):
            if self.valid_results:
                return sum(self.valid_results) / len(self.valid_results)
    
        @property
        def max(self):
            if self.valid_results:
                return max(self.valid_results)
    
        @property
        def form_text(self):
            if self.valid_results:#调整结果数据之间的间隔
                return '{0.valid_count},          {0.loss_rate:.1%},       {0.min:.1f}/{0.avg:.1f}/{0.max:.1f}'.format(self)
            elif self:
                return '不通'
            else:
                return 'EMPTY'
    
        def __str__(self):
            return self.form_text
    
        def __repr__(self):
            return '{}({})'.format(self.__class__.__name__, self.form_text)
    
    
    class PingTask(threading.Thread):
        def __init__(self, host, timeout, interval):
            """
            A threading.Thread based class for each host to ping.
            :param host: a host name or ip address
            :param timeout: timeout for each ping
            :param interval: the max time to sleep between each ping
            """
    
            self.host = host
    
            if re.match(r'(?:d{1,3}.){3}(?:d{1,3})$', host):
                self.ip = host
            else:
                print('Resolving host: {}'.format(host))
                try:
                    self.ip = socket.gethostbyname(host)
                except socket.gaierror:
                    print('Unable to resolve host: {}'.format(host))
                    exit(EXIT_CODE_DNS_ERR)
    
            self.timeout = timeout
            self.interval = interval
    
            self.pr = PingResults()
            self.finished = False
    
            super(PingTask, self).__init__()
    
        def run(self):
            while not self.finished:
                try:
                    rtt = ping(self.ip, timeout=self.timeout)
                except OSError:
                    print('Unable to ping: {}'.format(self.host))
                    break
                self.pr.append(rtt)
                escaped = rtt or self.timeout
                if escaped < self.interval:
                    time.sleep(self.interval - escaped)
    
        def finish(self):
            self.finished = True
    
    
    def mping(hosts, duration=DEFAULT_DURATION, timeout=1.0, interval=0.0, quiet=False, sort=True):
        """
        Ping hosts in multi-threads, and return the ping results.
        :param hosts: A list of hosts, or a {name: host, ...} formed dict. A host can be a domain or an ip address
        :param duration: The duration which pinging lasts in seconds
        :param timeout: The timeout for each single ping in each thread
        :param interval: The max time to sleep between each single ping in each thread
        :param quiet: Do not print results while processing
        :param sort: The results will be sorted by valid_count in reversed order if this param is True
        :return: A list of PingResults
        """
    
        def results(_tasks, _sort=True):
            """
            Return the current status of a list of PingTask
            """
            r = list(zip(heads, [t.pr for t in _tasks]))
            if _sort:
                r.sort(key=lambda x: x[1].valid_count, reverse=True)
            return r
    
        if isinstance(hosts, list):
            heads = hosts
        elif isinstance(hosts, dict):
            heads = list(hosts.items())
            hosts = hosts.values()
        else:
            type_err_msg = '`hosts` should be a host list, or a {name: host, ...} formed dict.'
            raise TypeError(type_err_msg)
    
        try:
            tasks = [PingTask(host, timeout, interval) for host in hosts]
        except KeyboardInterrupt:
            exit(EXIT_CODE_BY_USER)
        else:
            doing_msg = 'Pinging {} hosts'.format(len(hosts))
            if duration > 0:
                doing_msg += ' within {} seconds'.format(int(duration))
            doing_msg += '...'
    
            if quiet:
                print(doing_msg)
    
            for task in tasks:
                task.start()
    
            try:
                start = clock()
                while True:
    
                    if duration > 0:
                        remain = duration + start - clock()
                        if remain > 0:
                            time.sleep(min(remain, 1))
                        else:
                            break
                    else:
                        time.sleep(1)
    
                    if not quiet:
                        print('
    {}
    {}'.format(
                            results_string(results(tasks, True)[:10]),
                            doing_msg))
    
            except KeyboardInterrupt:
                print()
            finally:
                for task in tasks:
                    task.finish()
    
                # Maybe not necessary?
                # for task in tasks:
                #     task.join()
    
            return results(tasks, sort)
    
    
    def table_string(rows):
        rows = list(map(lambda x: list(map(str, x)), rows))
        widths = list(map(lambda x: max(map(len, x)), zip(*rows)))
        rows = list(map(lambda y: ' | '.join(map(lambda x: '{:{w}}'.format(x[0], w=x[1]), zip(y, widths))), rows))
        rows.insert(1, '-|-'.join(list(map(lambda x: '-' * x, widths))))
        return '
    '.join(rows)
    
    
    def results_string(prs):
        named = True if isinstance(prs[0][0], tuple) else False
        rows = [['IP', '有效次数  , 丢包率%  , min/avg/max']]
        if named:
            rows[0].insert(0, 'name')
    
        for head, pr in prs:
            row = list()
            if named:
                row.extend(head)
            else:
                row.append(head)
            row.append(pr.form_text)
            rows.append(row)
        return table_string(rows)
    
    
    def main():
        ap = argparse.ArgumentParser(
            description='Ping multiple hosts concurrently and find the fastest to you.',
            epilog='A plain text file or a json can be used as the -p/--path argument: '
                   '1. Plain text file: hosts in lines; '
                   '2. Json file: hosts in a list or a object (dict) with names.')
    
        ap.add_argument(
            'hosts', type=str, nargs='*',
            help='a list of hosts, separated by space')
    
        ap.add_argument(
            '-p', '--path', type=str, metavar='path',
            help='specify a file path to get the hosts from')
    
        ap.add_argument(
            '-d', '--duration', type=float, default=DEFAULT_DURATION, metavar='secs',
            help='the duration how long the progress lasts (default: {})'.format(DEFAULT_DURATION))
    
        ap.add_argument(
            '-i', '--interval', type=float, default=0.0, metavar='secs',
            help='the max time to wait between pings in each thread (default: 0)')
    
        ap.add_argument(
            '-t', '--timeout', type=float, default=1.0, metavar='secs',
            help='the timeout for each single ping in each thread (default: 1.0)')
    
        ap.add_argument(
            '-a', '--all', action='store_true',
            help='show all results (default: top 10 results)'
                 ', and note this option can be overridden by -S/--no_sort')
    
        ap.add_argument(
            '-q', '--quiet', action='store_true',
            help='do not print results while processing (default: print the top 10 hosts)'
        )
    
        ap.add_argument(
            '-S', '--do_not_sort', action='store_false', dest='sort',
            help='do not sort the results (default: sort by ping count in descending order)')
    
        args = ap.parse_args()
    
        hosts = None
        if not args.path and not args.hosts:
            ap.print_help()
        elif args.path:
            try:
                with open(args.path) as fp:
                    hosts = json.load(fp)
            except IOError as e:
                print('Unable open file:
    {}'.format(e))
                exit(EXIT_CODE_IO_ERR)
            except json.JSONDecodeError:
                with open(args.path) as fp:
                    hosts = re.findall(r'^s*([a-z0-9-.]+)s*$', fp.read(), re.M)
        else:
            hosts = args.hosts
    
        if not hosts:
            exit()
    
        results = mping(
            hosts=hosts,
            duration=args.duration,
            timeout=args.timeout,
            interval=args.interval,
            quiet=args.quiet,
            sort=args.sort
        )
    
        if not args.all and args.sort:
            results = results[:10]
    
        if not args.quiet:
            print('
    ********最终检查结果:*************************
    ')
    
        print(results_string(results))
    
    
    if __name__ == '__main__':
        main()
    View Code
  • 相关阅读:
    面试题:面试题归类 已看1 背1
    面试题:SSH项目总结 !=!=未看 没用
    面试题: 大公司面试 !=!=未看
    Java 重写paint绘图
    c语言中数组的定义和java中数组定义的一些区别
    Java GUI界面补充总结(不定期补充)
    JFrame 的层次结构 及 背景设置说明
    为什么内部类调用的外部变量必须是final修饰的?
    Java Calendar类的使用总结【转】
    String类为什么可以直接赋值
  • 原文地址:https://www.cnblogs.com/wt11/p/6386553.html
Copyright © 2011-2022 走看看