通过pcap与dpkt抓包解包示例:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ 网络数据包捕获与分析程序 """ import pcap import dpkt import json import re import time from urllib import unquote # 过滤输出目标ip dst_lists = [ '203.66.1.212', # nslookup dpdcs.4399sy.com.hk '52.74.10.186', # nslookup dpdcs.4399en.com '52.58.69.212', # nslookup dpdcs.4399sy.ru '220.241.11.3', # nslookup dpdcs.4399th.com '124.243.195.63', # nslookup sdkdcs.4399sy.com '42.62.106.216', # nslookup udpdcs.4399sy.com '42.62.106.230', # nslookup udpdcs.4399sy.com ] req_data = "" times = 0 def capt_data(eth_name="eth0", p_type=None): """ 捕获网卡数据包 :param eth_name 网卡名,eg. eth0,eth3... :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 :return: """ pc = pcap.pcap(eth_name) pc.setfilter('tcp port 80') # 设置监听过滤器 print 'start capture....' if pc: for p_time, p_data in pc: # p_time为收到时间,p_data为收到数据 anly_capt(p_time, p_data, p_type) def anly_capt(p_time, p_data, p_type): """ 解析数据包 :param p_data 收到数据 :param p_type 日志捕获类型 1:sdk日志用例分析 2:目标域名过滤输出 3:原始数据包 :return: """ p = dpkt.ethernet.Ethernet(p_data) if p.data.__class__.__name__ == 'IP': ip_data = p.data src_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.src))) dst_ip = '%d.%d.%d.%d' % tuple(map(ord, list(ip_data.dst))) if p.data.data.__class__.__name__ == 'TCP': tcp_data = p.data.data if tcp_data.dport == 80: # print tcp_data.data if tcp_data.data: # 调用日志模块,对日志进行处理 if p_type == 1: # sdk日志用例分析 if dst_ip in dst_lists: tmp = tcp_data.data.strip() global req_data, times if tmp.startswith("POST") or tmp.startswith("GET"): # or times > 0 if req_data: haiwai_log_case(req_data) req_data = tmp + " " # times = 0 else: req_data = req_data + tmp # times = times + 1 elif p_type == 2: # 目标域名过滤输出 if dst_ip in dst_lists: print "tcp_data:", tcp_data.data else: # 无过滤条件输出 print "tcp_data:", tcp_data.data # android 日誌類型,从data中获取 log_type_from_data = { 'open_game': u'[打开游戏]', 'network_check': u'[网络监测]', 'open_login': u'[登录界面前]', 'select_server': u'[选服日志]', 'create_role': u'[创角日志]', 'role_level_change': u'[等级日志]', # 海外,俄语 'activity_open': u'[打开游戏]', 'load_start_before_login': u'[加载开始]', 'load_finish_before_login': u'[加载结束]', 'activity_before_login': u'[登录界面前]', 'click_enter': u'[进入游戏]', 'get_user_server_login': u'[选服日志]', 'user_create_role': u'[创角日志]', 'role_login': u'[角色登录]', 'enter_success': u'[成功进入游戏]', 'role_level': u'[等级日志]', 'user_online': u'[在线日志]', 'exit_success': u'[退出游戏]', } # ios日誌類型,从请求资源路径获取 log_type_from_path = { 'activity_open.php': u'[打开游戏]', 'load_start_before_login.php': u'[加载开始]', 'load_finish_before_login.php': u'[加载结束]', 'activity_before_login.php': u'[登录界面前]', 'click_enter.php': u'[进入游戏]', 'get_user_server_login.php': u'[选服日志]', 'user_create_role.php': u'[创角日志]', 'role_login.php': u'[角色登录]', 'enter_success.php': u'[成功进入游戏]', 'user_online.php': u'[在线日志]', 'role_level.php': u'[等级日志]', 'exit_success.php': u'[退出游戏]', 'share.php': u'[分享日志]', 'init_info.php': u'[初始化日志]', 'event.php': u'[事件日志]', 'user_login.php': u'[user_login]', 'user_server_login.php': u'[user_server_login]', 'enter_game.php': u'[enter_game]', } # 过滤path filter_out_list = [ 'u/', 'plugin/error/check', 'service/version/get_info', ] # 过滤打印出属于列表中的host的日志。 host_list = [ 'dpdcs.4399sy.com.hk', 'dpdcs.4399en.com', 'dpdcs.4399sy.ru', 'dpdcs.4399th.com', 'sdkdcs.4399sy.com', 'udpdcs.4399sy.com', ] def formattime(t): # 日期字段格式化 return time.strftime('%c', time.gmtime(t + 8 * 3600)) def req_to_dict(req_string): """ 将请求数据转换为dic :param req_string: :return: """ req_dict = {} req_string = req_string.strip() if len(req_string) > 0: req_string = unquote(req_string) # print "req_string_after_unquote:",req_string m1 = re.search("(GET|POST)(.*)?(.*)HTTP/1.1", req_string) # (method,path,param) m2 = re.search("Host:(.*)", req_string) # (host,) # m3 = re.search("sdata=(.*)s", req_string) # (body,) m4 = re.search("sdata=([sS]*)", req_string) # (body,) # m5 = re.search("eventTime":"(d+)", req_string) # (eventTime,) m5 = re.search("eventTime"s*:s*"(d+)", req_string) if m1: req_dict["method"] = m1.group(1).strip() req_dict["path"] = m1.group(2).strip()[1:] param_string = m1.group(3).strip() if param_string: param_string = param_string.split("&") param_dict = {} for item in param_string: tmp_list = item.split("=") if len(tmp_list) > 1: param_dict[tmp_list[0]] = tmp_list[1] req_dict["param"] = param_dict if m2: req_dict["host"] = m2.group(1).strip() if m4: try: body = m4.group(1).replace(" ", "") body = json.loads(body) except ValueError: print "