zoukankan      html  css  js  c++  java
  • Python3网络学习案例二:traceroute详解

    1. 写在前面

    本文是基于上一篇“ping详解”写的;

    不同操作系统下的命令也不同,本文仅针对windows系统,命令为“tracert xxx”,效果如下

    2. 使用traceroute做什么

    与上一篇ping相似,原理上都是通过向目的主机发送一条消息并通过回显应答来判断目的主机状态。不同的是,traceroute主要用于遍历由源主机到目的主机交互线路上所有的路由器并判断其状态,一般以30为最大TTL(time to live,生存时间),即以该线路上有不超过29(30-1)台路由器为前提进行遍历。

    3. 编写traceroute要比编写ping多知道些什么

    (1)traceroute基本原理

    当路由器收到一份IP数据报,如果该报文的TTL字段是1,则意味着它在网路中的生存周期已经消耗殆尽,本路由处理后还未到达目的主机的话,需要将该数据报丢弃,并给信源主机发送一份ICMP超时报文(包含该中间路由器的地址),这意味着:通过发送一份TTL字段为n的IP数据报给目的主机,就得到了该路径中的第n个路由器的IP地址,那么我们使IP数据报的TTL字段值从1开始依次递增,就可以获得所有中间路由的ip地址。当IP数据报到达目的主机时,由于已经到达目的主机,因此不会再发送ICMP超时报文了,而是ICMP应答报文,通过区分收到的ICMP报文是超时报文(type=11)还是应答报文(type=0),以判断程序应该何时结束;

    (2)运行traceroute时要关闭防火墙(直接由系统发起的则不用,CMD)

    (至于为什么暂时讲不明白,待续);

    (3)TTL

    这里的TTL值可自定义,每经过一个节点(路由器等)TTL就会减少1,等到减少到0的时候我们称TTL过期了,此时会返回一条消息给源主机反映这条数据所到达的最后一台路由器的状态/信息;

    上面写最大TTL为30,事实上,我们是将数据的TTL从1开始设置的,依次增加1,这样我们就能够依次访问距离源主机一个节点、两个节点、三个节点……的路由器了(即遍历这条路上所有的主机/路由器);

    (4)TIMEOUT

    超时时间,事实上我们在ping上也提到了,这里在叙述一下,我们在发送了消息之后直到收到回复之后才会返回下一条消息,但是如果消息刚发出去就丢失了,或者其他情况导致没有消息返回呢?我们不可能无节制地等下去,因此,我们设置一个TIMEMOUT并规定,如果响应时间超过了这个长度我们就认为这一次的交流失败了,并向源主机(或者使用者)报告“请求超时”;

    (5)根据返回消息的type和code来判断该主机(路由器)状态

    编写ping的时候我们只需要指导回显请求(type = 8,code = 0)和回显应答(type = 0, code = 0)就行了,但是对于traceroute,我们还要注意到其他几个不同的type:

    type 11,code =  0:ttl过期,换句话说就是当数据经过某一个路由器时ttl减少为零了,此时接收到的返回信息反映了这个路由器的状态(这个路由器的ip等);

    type 3:出错了,所对应的code有数个值,但是我们只要知道返回消息向我们反映说这一次的路线不能正确地得到路由器的状态就行了,详细信息可以看下图中的type = 3时code对应的状态

     (6)tries

    我们上面提到,消息再发送出去的时候可能会丢失或者其他意外情况出现,但是我们并不希望一次失败就认为这个路由器不可访问等,因此,我们基本上会对这条线路尝试多次,一般为三次

    (7)DNS

    发送和接收消息都很快(以ms为单位),但是通过DNS查询返回消息的主机信息就显得有些慢了(以s为单位),因此,我们通常在尝试发送三次消息后再通过DNS查看是哪个主机响应了

    4. 源码

    # --coding:utf-8--
    
    import socket
    import os
    import struct
    import time
    import select
    
    # ICMP echo_request
    TYPE_ECHO_REQUEST = 8
    CODE_ECHO_REQUEST_DEFAULT = 0
    # ICMP echo_reply
    TYPE_ECHO_REPLY = 0
    CODE_ECHO_REPLY_DEFAULT = 0
    # ICMP overtime
    TYPE_ICMP_OVERTIME = 11
    CODE_TTL_OVERTIME = 0
    # ICMP unreachable
    TYPE_ICMP_UNREACHED = 3
    
    MAX_HOPS = 30  # set max hops-30
    TRIES = 3  # detect 3 times
    
    
    # checksum
    def check_sum(strings):
        csum = 0
        countTo = (len(strings) / 2) * 2
        count = 0
        while count < countTo:
            thisVal = strings[count + 1] * 256 + strings[count]
            csum = csum + thisVal
            csum = csum & 0xffffffff
            count = count + 2
        if countTo < len(strings):
            csum = csum + strings[len(strings) - 1]
            csum = csum & 0xffffffff
        csum = (csum >> 16) + (csum & 0xffff)
        csum = csum + (csum >> 16)
        answer = ~csum
        answer = answer & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer
    
    
    # get host_info by address
    def get_host_info(host_addr):
        try:
            host_info = socket.gethostbyaddr(host_addr)
        except socket.error as e:
            display = '{0}'.format(host_addr)
        else:
            display = '{0} ({1})'.format(host_addr, host_info[0])
        return display
    
    
    # construct ICMP datagram
    def build_packet():
        # primitive checksum
        my_checksum = 0
        # process_id
        my_id = os.getpid()
        # sequence as 1(>0)
        my_seq = 1
        # 2's header
        my_header = struct.pack("bbHHh", TYPE_ECHO_REQUEST, CODE_ECHO_REQUEST_DEFAULT, my_checksum, my_id, my_seq)
        # SYS_time as payload
        my_data = struct.pack("d", time.time())
        # temporary datagram
        package = my_header + my_data
        # true checksum
        my_checksum = check_sum(package)
        # windows-big endian
        my_checksum = socket.htons(my_checksum)
        # repack
        my_header = struct.pack("bbHHh", TYPE_ECHO_REQUEST, CODE_ECHO_REQUEST_DEFAULT, my_checksum, my_id, 1)
        # true datagram
        ip_package = my_header + my_data
        return ip_package
    
    
    def main(hostname):
        print("routing {0}[{1}](max hops = 30, detect tries = 3)".format(hostname, socket.gethostbyname(hostname)))
        for ttl in range(1, MAX_HOPS):
            print("%2d" % ttl, end="")
            for tries in range(0, TRIES):
                # create raw socket
                icmp_socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
                icmp_socket.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, struct.pack('I', ttl))
                icmp_socket.settimeout(TIMEOUT)
                # construct datagram
                icmp_package = build_packet()
                icmp_socket.sendto(icmp_package, (hostname, 0))
                # waiting for receiving reply
                start_time = time.time()
                select.select([icmp_socket], [], [], TIMEOUT)
                end_time = time.time()
                # compute time of receiving
                during_time = end_time - start_time
                if during_time >= TIMEOUT or during_time == 0:
                    print("    *    ", end="")
                else:
                    print(" %4.0f ms " % (during_time * 1000), end="")
                if tries >= TRIES - 1:
                    try:
                        ip_package, ip_info = icmp_socket.recvfrom(1024)
                    except socket.timeout:
                        print(" request time out")
                    else:
                        # extract ICMP header from IP datagram
                        icmp_header = ip_package[20:28]
    
                        # unpack ICMP header
                        after_type, after_code, after_checksum, after_id, after_sequence = struct.unpack("bbHHh",
                                                                                                         icmp_header)
                        output = get_host_info(ip_info[0])
    
                        if after_type == TYPE_ICMP_UNREACHED:  # unreachable
                            print("Wrong!unreached net/host/port!")
                            break
                        elif after_type == TYPE_ICMP_OVERTIME:  # ttl overtimr
                            print(" %s" % output)
                            continue
                        elif after_type == 0:  # type_echo
                            print(" %s" % output)
                            print("program run over!")
                            return
                        else:
                            print("request timeout")
                            print("program run wrongly!")
                            return
    
    
    if __name__ == "__main__":
        while True:
            try:
                ip = input("please input a ip address:")
                global TIMEOUT
                TIMEOUT = int(input("Input timeout you want: "))
                main(ip)
                break
            except Exception as e:
                print(e)
                continue
  • 相关阅读:
    如何解决App无法收到android开机广播
    如何实现开机启动、清缓存、杀进程、悬浮窗口单双击区分,附源码
    WaitForSingleObject 介绍【转】
    C++ Unicode SBCS 函数对照表【转】
    数字IP字符串IP转换
    打印内存【CSDN】
    巧妙的无重复随机数方法
    <unnamedtag>”后面接“int”是非法的
    友元函数与重载运算符【转】
    QT QTableWidget 用法总结【转】
  • 原文地址:https://www.cnblogs.com/YuanShiRenY/p/Python_Traceroute.html
Copyright © 2011-2022 走看看