zoukankan      html  css  js  c++  java
  • Python3.5 ping脚本

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    __version__ = "0.2"
    
    import os
    import select
    import socket
    import struct
    import sys
    import time
    
    # From /usr/include/linux/icmp.h; your milage may vary.
    ICMP_ECHO_REQUEST = 8 # Seems to be the same on Solaris.
    
    
    def checksum(source_string):
        """
        I'm not too confident that this is right but testing seems
        to suggest that it gives the same answers as in_cksum in ping.c
        """
        sum = 0
        count_to = int((len(source_string) / 2) * 2)
        for count in range(0, count_to, 2):
            this = source_string[count + 1] * 256 + source_string[count]
            sum = sum + this
            sum = sum & 0xffffffff # Necessary?
    
        if count_to < len(source_string):
            sum = sum + source_string[len(source_string) - 1]
            sum = sum & 0xffffffff # Necessary?
    
        sum = (sum >> 16) + (sum & 0xffff)
        sum = sum + (sum >> 16)
        answer = ~sum
        answer = answer & 0xffff
    
        # Swap bytes. Bugger me if I know why.
        answer = answer >> 8 | (answer << 8 & 0xff00)
    
        return answer
    
    
    def receive_one_ping(my_socket, id, timeout):
        """
        Receive the ping from the socket.
        """
        time_left = timeout
        while True:
            started_select = time.time()
            what_ready = select.select([my_socket], [], [], time_left)
            how_long_in_select = (time.time() - started_select)
            if what_ready[0] == []: # Timeout
                return
    
            time_received = time.time()
            received_packet, addr = my_socket.recvfrom(1024)
            icmpHeader = received_packet[20:28]
            type, code, checksum, packet_id, sequence = struct.unpack(
                "bbHHh", icmpHeader
            )
            if packet_id == id:
                bytes = struct.calcsize("d")
                time_sent = struct.unpack("d", received_packet[28:28 + bytes])[0]
                return time_received - time_sent
    
            time_left = time_left - how_long_in_select
            if time_left <= 0:
                return
    
    
    def send_one_ping(my_socket, dest_addr, id, psize):
        """
        Send one ping to the given >dest_addr<.
        """
        dest_addr  =  socket.gethostbyname(dest_addr)
    
        # Remove header size from packet size
        psize = psize - 8
    
        # Header is type (8), code (8), checksum (16), id (16), sequence (16)
        my_checksum = 0
    
        # Make a dummy heder with a 0 checksum.
        header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, id, 1)
        bytes = struct.calcsize("d")
        data = (psize - bytes) * "Q"
        data = struct.pack("d", time.time()) + data.encode()
    
        # Calculate the checksum on the data and the dummy header.
        my_checksum = checksum(header + data)
    
        # Now that we have the right checksum, we put that in. It's just easier
        # to make up a new header than to stuff it into the dummy.
        header = struct.pack(
            "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), id, 1
        )
        packet = header + data
        my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
    
    
    def do_one(dest_addr, timeout, psize):
        """
        Returns either the delay (in seconds) or none on timeout.
        """
        icmp = socket.getprotobyname("icmp")
        try:
            my_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp)
        except socket.error as e:
            if e.errno == 1:
                # Operation not permitted
                msg = (
                    " - Note that ICMP messages can only be sent from processes"
                    " running as root."
                )
                raise socket.error(msg)
            raise # raise the original error
    
        my_id = os.getpid() & 0xFFFF
    
        send_one_ping(my_socket, dest_addr, my_id, psize)
        delay = receive_one_ping(my_socket, my_id, timeout)
    
        my_socket.close()
        return delay
    
    
    def verbose_ping(dest_addr, timeout = 2, count = 4, psize = 64):
        """
        Send `count' ping with `psize' size to `dest_addr' with
        the given `timeout' and display the result.
        """
        for i in range(count):
            print("ping %s with ..." % dest_addr,)
            try:
                delay  =  do_one(dest_addr, timeout, psize)
            except socket.gaierror as e:
                print("failed. (socket error: '%s')" % e[1])
                break
    
            if delay  ==  None:
                print("failed. (timeout within %ssec.)" % timeout)
            else:
                delay  =  delay * 1000
                print("get ping in %0.4fms" % delay)
    
    
    def quiet_ping(dest_addr, timeout = 2, count = 4, psize = 64):
        """
        Send `count' ping with `psize' size to `dest_addr' with
        the given `timeout' and display the result.
        Returns `percent' lost packages, `max' round trip time
        and `avrg' round trip time.单位为ms
        """
        mrtt = None
        artt = None
        lost = 0
        plist = []
    
        for i in range(count):
            try:
                delay = do_one(dest_addr, timeout, psize)
            except socket.gaierror as e:
                print("failed. (socket error: '%s')" % e[1])
                break
    
            if delay != None:
                delay = delay * 1000
                plist.append(delay)
    
        # Find lost package percent
        percent_lost = 100 - (len(plist) * 100 / count)
    
        # Find max and avg round trip time
        if plist:
            mrtt = max(plist)
            artt = sum(plist) / len(plist)
    
        return percent_lost, mrtt, artt
    
    if __name__ == '__main__':
        # verbose_ping("heise.de")
        # verbose_ping("google.com")
        # verbose_ping("a-test-url-taht-is-not-available.com")
        # verbose_ping("192.168.1.1")
        verbose_ping("www.baidu.com")
        res=quiet_ping("www.baidu.com")  #返回结果为丢包率,最大延时和平均延时
        print(res)
  • 相关阅读:
    异常之【You have an error in your SQL syntax】
    Android 回调的理解,觉得写得好就转过来。。。收藏一下
    git 解决冲突方法
    最重要的“快捷键” IntelliJ IDEA
    Git 还没push 前可以做的事(转)
    android JNI(转)
    Windos下Android(ADT Bundle)配置NDK的两种方法------ADT、Cygwin、NDK配置汇总(转)
    warning C4819: 该文件包含不能在当前代码页(936)中表示的字符。请将该文件保存为 Unicode 格式以防止数据丢失
    android-ndk-r7b编译环境Cygwin工具搭建及配置(转)
    Eclipse Java常用快捷键(Eclipse Shortcut Keys for Java Top10)(转)
  • 原文地址:https://www.cnblogs.com/xwupiaomiao/p/11730638.html
Copyright © 2011-2022 走看看