1.文件扫描获取
1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 import os 4 import time 5 import glob 6 import logging 7 from datetime import datetime 8 from datetime import timedelta 9 10 from utils import Utils 11 from netflowmeta_config import NOT_SCAN 12 from netflowmeta_config import FREE_TIMES 13 from netflowmeta_config import SLEEP_TIME 14 from netflowmeta_config import FINISH_SCAN 15 from netflowmeta_config import TCP_FLOW_APTH 16 from netflowmeta_config import BASE_SCAN_NODE 17 18 from progress_manage import DateManager 19 20 class TcpAvroScanner(object): 21 22 def __init__(self, time_span=60, delay=20): 23 """ 24 :param time_span:文件获取时间跨度,单位秒 25 """ 26 self.today = None 27 self.delay = delay 28 self.offset = time_span 29 self._ctrl_info = self.init_scan_control(time_span) 30 self.progress_manager = DateManager(TCP_FLOW_APTH, self.offset) 31 32 33 34 def get_new_files(self): 35 """ 36 扫描获取新文件 37 :return: 38 """ 39 self.progress_manager.flush_progress() 40 self.today = datetime.now() 41 try: 42 scan_date = self.schedule_date_ctl() 43 status, index, offset = self.schedule_time_ctl(scan_date) 44 if status: 45 start_time, end_time = self._get_scan_time_range(index, offset) 46 files_dict = self.select_file(scan_date, start_time, end_time) 47 self.save_progress(scan_date, index) 48 else: 49 files_dict = {} 50 return files_dict 51 except Exception as e: 52 logging.error('get new file list warn, msg: {}'.format(e.message)) 53 time.sleep(SLEEP_TIME) 54 return {} 55 56 def schedule_time_ctl(self, scan_date): 57 """ 58 根据调度日期,去获取扫描点 59 :param scan_date: 扫描日期 60 :return: index 扫描点 61 """ 62 today = self.today.strftime("%Y%m%d") 63 if scan_date not in self.progress_manager.get(): 64 self._ctrl_info['counter'] = FREE_TIMES 65 logging.info("There is no file in {}".format(scan_date)) 66 return False, 0, 0 67 range_val = self.progress_manager.get(scan_date)["range"] 68 cur_offset = self.progress_manager.get(scan_date)["offset"] 69 if scan_date == today: 70 scan_now = self.today - timedelta(seconds=self.delay) 71 max_range = (scan_now.hour * 60 + scan_now.minute) * 60 // cur_offset 72 for i in range(max_range): 73 if Utils.get_scan_val(range_val, i) == NOT_SCAN: 74 self._ctrl_info['counter'] = 0 75 return True, i, cur_offset 76 self._ctrl_info['counter'] += 1 77 return False, max_range - 1, cur_offset 78 else: 79 scan_node = self.progress_manager.init_scan_node(cur_offset) 80 if Utils.get_scan_val(range_val, scan_node) != FINISH_SCAN: 81 for i in range(scan_node): 82 if Utils.get_scan_val(range_val, i) == NOT_SCAN: 83 return True, i, cur_offset 84 self.set_scan_finished(scan_date) 85 logging.info("Forbid scan date {}. There is no files to get.".format(scan_date)) 86 return False, scan_node, cur_offset 87 88 def schedule_date_ctl(self): 89 """ 90 日期调度,优先调度当天。 91 空闲调度:当前系统时间连续调度 FREE_TIMES 次,回溯调度一次 92 :return:date:%Y%m%d 93 """ 94 if self._ctrl_info['counter'] >= FREE_TIMES: 95 self._ctrl_info['counter'] = 0 96 all_date = self.progress_manager.get().keys() 97 if len(all_date) > 1: 98 all_date.sort(reverse=True) 99 today_format = self.today.strftime('%Y%m%d') 100 backtrack_index = all_date.index(today_format) + 1 if today_format in all_date else 0 101 for i in range(backtrack_index, len(all_date)): 102 scan_node = self.progress_manager.init_scan_node( 103 self.progress_manager.get(all_date[i])["offset"] 104 ) 105 if Utils.get_scan_val( 106 self.progress_manager.get(all_date[i])["range"], 107 scan_node 108 ) != FINISH_SCAN: 109 # 不等于0 就可以扫描调度,否则排除当前日期 110 return all_date[i] 111 #无文件休眠 112 time.sleep(SLEEP_TIME) 113 #非调度,时间为当天时 114 # 此处与扫描时间保持一致 - delay秒,防止跨天进度保存bug 115 return (self.today - timedelta(seconds=self.delay)).strftime("%Y%m%d") 116 117 def _get_scan_time_range(self, index, offset): 118 """ 119 获取扫描时间范围 120 :return: 121 """ 122 return self.change_index_to_time_range(index, offset) 123 124 def change_index_to_time_range(self, index, offset): 125 """ 126 根据扫描索引转化为扫描时间 127 :param index: 扫描点 128 :return: 起始时间,结束时间 129 """ 130 start = index * offset 131 end = (index + 1) * offset 132 return [int(self.format_sec(start)), int(self.format_sec(end))] 133 134 def format_sec(self, total): 135 """ 136 格式化文件时间表达式 137 :param total: 138 :return: 139 """ 140 total = total if total < BASE_SCAN_NODE else BASE_SCAN_NODE 141 return "{}{}{}".format( 142 str(total // 3600).zfill(2), 143 str(total % 3600 // 60).zfill(2), 144 str(total % 3600 % 60).zfill(2) 145 ) 146 147 def select_file(self, scan_date, start_time, end_time): 148 pattern = os.path.join(TCP_FLOW_APTH, scan_date, '*') 149 file_list_of_dev = {} 150 dev_path_list = [dp for dp in glob.glob(pattern) if os.path.isdir(dp)] 151 for dev_path in dev_path_list: 152 dev_name = "{}_{}".format(os.path.basename(dev_path),scan_date) 153 file_list_of_dev[dev_name] = [] 154 file_pattern = os.path.join(dev_path, '*.avro') 155 file_gen = (fl for fl in glob.glob(file_pattern)) 156 for fl in file_gen: 157 fl_time = int(os.path.basename(fl).split("_")[0]) 158 if start_time <= fl_time < end_time: 159 file_list_of_dev[dev_name].append(fl) 160 return file_list_of_dev 161 162 def init_scan_control(self,span): 163 progress = {} 164 progress['counter'] = 0 165 progress['offset'] = span 166 return progress 167 168 def save_progress(self, scan_date, index): 169 """ 170 保存扫描进度 171 :param scan_date: 日期 172 :param index:扫描点 173 :return: 174 """ 175 self.progress_manager.save_scan_info(scan_date, index) 176 177 def set_scan_finished(self, scan_date): 178 """ 179 设定扫描完成 180 :param scan_date: 181 :return: 182 """ 183 self.progress_manager.set_scan_finished(scan_date) 184 185 186 if __name__ == '__main__': 187 ta = TcpAvroScanner(57) 188 while True: 189 ta.get_new_files() 190
2.进度控制
#!/usr/bin/env python # -*- coding: utf-8 -*- import os import re import copy from datetime import date from cfgopr import CfgOpr from utils import Utils from netflowmeta_config import NOT_SCAN from netflowmeta_config import FINISH_SCAN from netflowmeta_config import BASE_SCAN_NODE from netflowmeta_config import DATE_PROGRESS_FILE class BaseManager(object): def __init__(self, path): self.path = path class DateManager(BaseManager): def __init__(self, path, span=60): super(DateManager, self).__init__(path) self.offset = span self.__progress = None self.init_progress() def init_progress(self): """ 日期进度初始化 扫描进度初始化 :return: """ if not self.__progress: self.get_all_progress() else: self.flush_progress() def get_all_progress(self, pattern='^d{8}$'): """ 获取所有key下的进度 :param key: 日志类型.文件类型 :param path: 日志类型目录 :param span: 默认获取跨度 :param pattern: 日期正则模式 :return: True """ all_progress = {} date_path = [ (os.path.join(self.path, tar_date, DATE_PROGRESS_FILE),tar_date) for tar_date in os.listdir(self.path) if (re.match(pattern, tar_date) and os.path.isdir(os.path.join(self.path, tar_date))) ] for item in date_path: _, dp_data = CfgOpr.json_load(item[0]) if dp_data: dp_data['range'] = Utils.base32to10(dp_data['range']) all_progress[item[1]] = dp_data else: dp_data['range'] = Utils.set_scan_val( NOT_SCAN, self.init_scan_node(self.offset) ) dp_data['offset'] = self.offset all_progress[item[1]] = dp_data cur_date = date.today().strftime("%Y%m%d") if cur_date not in all_progress: all_progress[cur_date] = {} all_progress[cur_date]['range'] = Utils.set_scan_val( NOT_SCAN, self.init_scan_node(self.offset) ) all_progress[cur_date]['offset'] = self.offset self.__progress = all_progress return True def flush_progress(self, pattern='^d{8}$'): """ 刷新进度,防止数据目录淘汰后异常,或跨天新增目录,未纳入管理 :param key: 日志名.文件名 :param path: 进度文件目录(日期目录同级) :param span: 文件获取时间跨度 :param pattern: 文件夹正则表达式 :return: True """ all_exist_date = [ tar_date for tar_date in os.listdir(self.path) if (re.match(pattern, tar_date) and os.path.isdir(os.path.join(self.path, tar_date)) ) ] new_progress = {} for old_date in self.__progress.keys(): if old_date in all_exist_date: new_progress[old_date] = self.__progress[old_date] for exist_date in all_exist_date: if exist_date not in new_progress: new_progress[exist_date] = {} new_progress[exist_date]['range'] = Utils.set_scan_val( NOT_SCAN, self.init_scan_node(self.offset) ) new_progress[exist_date]['offset'] = self.offset self.__progress = new_progress return True def set_scan_finished(self, scan_date): """ 设定扫描完成 :param key:日志类型.文件类型 :param path: 保存路径 :param scan_date: 扫描日期 :return: True or False """ range_val = self.__progress[scan_date]["range"] scan_node = self.init_scan_node( self.__progress[scan_date]["offset"] ) self.__progress[scan_date]["range"] = Utils.set_scan_val(range_val, scan_node, FINISH_SCAN) save_data = copy.deepcopy(self.__progress[scan_date]) save_data["range"] = Utils.base10to32(save_data["range"]) progress_path = os.path.join(self.path, scan_date, DATE_PROGRESS_FILE) return CfgOpr.json_dump(save_data, progress_path) def save_scan_info(self, scan_date, index): """ 保存扫描信息 :param key:日志类型.文件类型 :param path: 保存路径 :param scan_date: 扫描日期 :param index: 扫描点 :return: True or False """ self.__progress[scan_date]["range"] = Utils.set_scan_val( self.__progress[scan_date]["range"], index ) save_data = copy.deepcopy(self.__progress[scan_date]) save_data["range"] = Utils.base10to32(save_data["range"]) progress_path = os.path.join(self.path, scan_date, DATE_PROGRESS_FILE) return CfgOpr.json_dump(save_data, progress_path) def init_scan_node(self, span): """ 初始化扫描节点 :param span:文件获取时间跨度 :return: 当前扫描节点 """ return BASE_SCAN_NODE // int(span) + (1 if BASE_SCAN_NODE % int(span) else 0) def get(self, scan_date=None): return self.__progress if scan_date == None else self.__progress.get(scan_date, {})
3.配置
BASE_SCAN_NODE = 86400 FINISH_SCAN = 0 NOT_SCAN = 0 FREE_TIMES = 3 SLEEP_TIME = 1