zoukankan      html  css  js  c++  java
  • python+pcap+dpkt抓包小实例

    通过pcap与dpkt抓包解包示例:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    """ 网络数据包捕获与分析程序 """
    
    import pcap
    import dpkt
    import json
    import re
    import time
    from urllib import unquote
    
    # 过滤输出目标ip
    dst_lists = [
        '203.66.1.212',  # nslookup dpdcs.4399sy.com.hk
        '52.74.10.186',  # nslookup dpdcs.4399en.com
        '52.58.69.212',  # nslookup dpdcs.4399sy.ru
        '220.241.11.3',  # nslookup dpdcs.4399th.com
        '124.243.195.63',  # nslookup sdkdcs.4399sy.com
        '42.62.106.216',  # nslookup udpdcs.4399sy.com
        '42.62.106.230',  # nslookup udpdcs.4399sy.com
    ]
    
    req_data = ""
    times = 0
    
    
    def capt_data(eth_name="eth0", p_type=None):
        """
        捕获网卡数据包
        :param eth_name  网卡名,eg. eth0,eth3...
        :param p_type    日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包
        :return:
        """
    
        pc = pcap.pcap(eth_name)
        pc.setfilter('tcp port 80')  # 设置监听过滤器
        print 'start capture....'
        if pc:
            for p_time, p_data in pc:  # p_time为收到时间,p_data为收到数据
                anly_capt(p_time, p_data, p_type)
    
    
    def anly_capt(p_time, p_data, p_type):
        """
        解析数据包
        :param p_data  收到数据
        :param p_type  日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包
        :return:
        """
    
        p = dpkt.ethernet.Ethernet(p_data)
        if p.data.__class__.__name__ == 'IP':
            ip_data = p.data
            src_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.src)))
            dst_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.dst)))
            if p.data.data.__class__.__name__ == 'TCP':
                tcp_data = p.data.data
                if tcp_data.dport == 80:
                    # print tcp_data.data
                    if tcp_data.data:
                        # 调用日志模块,对日志进行处理
                        if p_type == 1:
                            # sdk日志用例分析
                            if dst_ip in dst_lists:
                                tmp = tcp_data.data.strip()
                                global req_data, times
                                if tmp.startswith("POST") or tmp.startswith("GET"):  # or times > 0
                                    if req_data:
                                        haiwai_log_case(req_data)
                                    req_data = tmp + "
    "
                                    # times = 0
                                else:
                                    req_data = req_data + tmp
                                    # times = times + 1
    
                        elif p_type == 2:
                            # 目标域名过滤输出
                            if dst_ip in dst_lists:
                                print "tcp_data:", tcp_data.data
    
                        else:
                            # 无过滤条件输出
                            print "tcp_data:", tcp_data.data
    
    
    # android 日誌類型,从data中获取
    log_type_from_data = {
    
        'open_game': u'[打开游戏]',
        'network_check': u'[网络监测]',
        'open_login': u'[登录界面前]',
        'select_server': u'[选服日志]',
        'create_role': u'[创角日志]',
        'role_level_change': u'[等级日志]',
    
        # 海外,俄语
        'activity_open': u'[打开游戏]',
        'load_start_before_login': u'[加载开始]',
        'load_finish_before_login': u'[加载结束]',
        'activity_before_login': u'[登录界面前]',
        'click_enter': u'[进入游戏]',
        'get_user_server_login': u'[选服日志]',
        'user_create_role': u'[创角日志]',
        'role_login': u'[角色登录]',
        'enter_success': u'[成功进入游戏]',
        'role_level': u'[等级日志]',
        'user_online': u'[在线日志]',
        'exit_success': u'[退出游戏]',
    
    }
    
    # ios日誌類型,从请求资源路径获取
    log_type_from_path = {
        'activity_open.php': u'[打开游戏]',
        'load_start_before_login.php': u'[加载开始]',
        'load_finish_before_login.php': u'[加载结束]',
        'activity_before_login.php': u'[登录界面前]',
        'click_enter.php': u'[进入游戏]',
        'get_user_server_login.php': u'[选服日志]',
        'user_create_role.php': u'[创角日志]',
        'role_login.php': u'[角色登录]',
        'enter_success.php': u'[成功进入游戏]',
        'user_online.php': u'[在线日志]',
        'role_level.php': u'[等级日志]',
        'exit_success.php': u'[退出游戏]',
        'share.php': u'[分享日志]',
        'init_info.php': u'[初始化日志]',
        'event.php': u'[事件日志]',
        'user_login.php': u'[user_login]',
        'user_server_login.php': u'[user_server_login]',
        'enter_game.php': u'[enter_game]',
    }
    
    # 过滤path
    filter_out_list = [
        'u/',
        'plugin/error/check',
        'service/version/get_info',
    ]
    
    # 过滤打印出属于列表中的host的日志。
    host_list = [
        'dpdcs.4399sy.com.hk',
        'dpdcs.4399en.com',
        'dpdcs.4399sy.ru',
        'dpdcs.4399th.com',
        'sdkdcs.4399sy.com',
        'udpdcs.4399sy.com',
    ]
    
    
    def formattime(t):  # 日期字段格式化
        return time.strftime('%c', time.gmtime(t + 8 * 3600))
    
    
    def req_to_dict(req_string):
        """
        将请求数据转换为dic
        :param req_string:
        :return:
        """
        req_dict = {}
        req_string = req_string.strip()
        if len(req_string) > 0:
            req_string = unquote(req_string)
            # print "req_string_after_unquote:",req_string
            m1 = re.search("(GET|POST)(.*)?(.*)HTTP/1.1", req_string)  # (method,path,param)
            m2 = re.search("Host:(.*)", req_string)  # (host,)
            # m3 = re.search("sdata=(.*)s", req_string)  # (body,)
            m4 = re.search("sdata=([sS]*)", req_string)  # (body,)
            # m5 = re.search("eventTime":"(d+)", req_string)  # (eventTime,)
            m5 = re.search("eventTime"s*:s*"(d+)", req_string)
            if m1:
                req_dict["method"] = m1.group(1).strip()
                req_dict["path"] = m1.group(2).strip()[1:]
                param_string = m1.group(3).strip()
                if param_string:
                    param_string = param_string.split("&")
                    param_dict = {}
                    for item in param_string:
                        tmp_list = item.split("=")
                        if len(tmp_list) > 1:
                            param_dict[tmp_list[0]] = tmp_list[1]
                    req_dict["param"] = param_dict
            if m2:
                req_dict["host"] = m2.group(1).strip()
            if m4:
                try:
                    body = m4.group(1).replace("
    ", "")
                    body = json.loads(body)
                except ValueError:
                    print "33[1;31;40m"
                    print "m4:Error:body ValueError,req_string-->%s" % req_string
                    print "33[0m"
                    body = {}
                req_dict["body"] = body
    
            if m5:
                req_dict["eventTime"] = formattime(int(m5.group(1)))
    
        return req_dict
    
    
    def haiwai_log_assert(req_dict):
        """
        日志断言处理,输出分析结果
        :param req_dict:
        :return:
        """
    
        # 从 data 中获取日志类型
        if isinstance(req_dict, dict) and req_dict.get("body"):
            if req_dict.get("body").get("data"):
                data_type = req_dict.get("body").get("data").keys()
                data_type_set = set(data_type)
                types_key_set = set(log_type_from_data.keys())
                intersect = data_type_set.intersection(types_key_set)
                if intersect:
                    log_type = intersect.pop()
                    print "33[1;31;40m %s log pass!--from body data || EventTime:-->[%s] 33[0m" % (
                        log_type_from_data.get(log_type), req_dict.get("eventTime"))
                    print req_dict
                else:
                    if 'common' in data_type and len(data_type) == 2:
                        data_type.remove('common')
                        print "33[1;31;40m %s log not register!--from body data 33[0m" % data_type
    
        # 从 path 中获取日志类型
        path = req_dict.get("path")
        host = req_dict.get("host")
        if host in host_list:
            if path in log_type_from_path.keys():
                eventTime = ""
                if req_dict.get("eventTime"):
                    eventTime = req_dict.get("eventTime")
                else:
                    if req_dict.get("param"):
                        eventTime = req_dict.get("param").get("time")
                        if eventTime:
                            eventTime = formattime(int(eventTime))
                print "33[1;31;40m %s log pass--from url || EventTime:-->[%s] 33[0m" % (
                    log_type_from_path.get(path), eventTime)
                print req_dict
            elif path and path not in filter_out_list:
                print "33[1;31;40m %s log not register!--from url! 33[0m" % path
                print req_dict
    
    
    def client_log_check(log_type, req_dict, platform="sy"):
        """
        检查SDK客户端请求字段,返回测试结果集
        :param log_type: 日志类型
        :param req_dict: 日志字典
        :param platform: 测试平台
        :return:
        """
        pass
    
    
    def haiwai_log_case(req_string):
        """
        日志用例集
        一:将数据包转换为dict
        二:对日志分析处理,输出测试结果
        """
    
        req_dict = req_to_dict(req_string)
        haiwai_log_assert(req_dict)
    
    
    if __name__ == '__main__':
        try:
            capt_data("eth3", 1)
        except TypeError:
            capt_data("eth3", 1)

    ***微信扫一扫,关注“python测试开发圈”,了解更多测试教程!***
  • 相关阅读:
    判断两个链表是否相交
    【转】TCP连接突然断开的处理方法
    【转】TCP/IP协议——ARP详解
    HTTP协议COOKIE和SESSION有什么区别
    【转】K-Means聚类算法原理及实现
    【转】机器学习实战之K-Means算法
    unity3d 调用Start 注意
    u3d 加载PNG做 UI图片
    Opengl的gl_NormalMatrix
    OpenGL 遮挡查询
  • 原文地址:https://www.cnblogs.com/guanfuchang/p/6495533.html
Copyright © 2011-2022 走看看