zoukankan      html  css  js  c++  java
  • 文件时间进度扫描控制,可回溯,空闲扫描,系统时间调整不影响

    1.文件扫描获取

      1 #!/usr/bin/env python
      2 # -*- coding: utf-8 -*-
      3 import os
      4 import time
      5 import glob
      6 import logging
      7 from datetime import datetime
      8 from datetime import timedelta
      9 
     10 from utils import Utils
     11 from netflowmeta_config import NOT_SCAN
     12 from netflowmeta_config import FREE_TIMES
     13 from netflowmeta_config import SLEEP_TIME
     14 from netflowmeta_config import FINISH_SCAN
     15 from netflowmeta_config import TCP_FLOW_APTH
     16 from netflowmeta_config import BASE_SCAN_NODE
     17 
     18 from progress_manage import DateManager
     19 
     20 class TcpAvroScanner(object):
     21 
     22     def __init__(self, time_span=60, delay=20):
     23         """
     24         :param time_span:文件获取时间跨度,单位秒
     25         """
     26         self.today = None
     27         self.delay = delay
     28         self.offset = time_span
     29         self._ctrl_info = self.init_scan_control(time_span)
     30         self.progress_manager = DateManager(TCP_FLOW_APTH, self.offset)
     31 
     32 
     33 
     34     def get_new_files(self):
     35         """
     36         扫描获取新文件
     37         :return:
     38         """
     39         self.progress_manager.flush_progress()
     40         self.today = datetime.now()
     41         try:
     42             scan_date = self.schedule_date_ctl()
     43             status, index, offset = self.schedule_time_ctl(scan_date)
     44             if status:
     45                 start_time, end_time = self._get_scan_time_range(index, offset)
     46                 files_dict = self.select_file(scan_date, start_time, end_time)
     47                 self.save_progress(scan_date, index)
     48             else:
     49                 files_dict = {}
     50             return files_dict
     51         except Exception as e:
     52             logging.error('get new file list warn, msg: {}'.format(e.message))
     53             time.sleep(SLEEP_TIME)
     54             return {}
     55 
     56     def schedule_time_ctl(self, scan_date):
     57         """
     58         根据调度日期,去获取扫描点
     59         :param scan_date: 扫描日期
     60         :return: index 扫描点
     61         """
     62         today = self.today.strftime("%Y%m%d")
     63         if scan_date not in self.progress_manager.get():
     64             self._ctrl_info['counter'] = FREE_TIMES
     65             logging.info("There is no file in {}".format(scan_date))
     66             return False, 0, 0
     67         range_val = self.progress_manager.get(scan_date)["range"]
     68         cur_offset = self.progress_manager.get(scan_date)["offset"]
     69         if scan_date == today:
     70             scan_now = self.today - timedelta(seconds=self.delay)
     71             max_range = (scan_now.hour * 60 + scan_now.minute) * 60 // cur_offset
     72             for i in range(max_range):
     73                 if Utils.get_scan_val(range_val, i) == NOT_SCAN:
     74                     self._ctrl_info['counter'] = 0
     75                     return True, i, cur_offset
     76             self._ctrl_info['counter'] += 1
     77             return False, max_range - 1, cur_offset
     78         else:
     79             scan_node = self.progress_manager.init_scan_node(cur_offset)
     80             if Utils.get_scan_val(range_val, scan_node) != FINISH_SCAN:
     81                 for i in range(scan_node):
     82                     if Utils.get_scan_val(range_val, i) == NOT_SCAN:
     83                         return True, i, cur_offset
     84             self.set_scan_finished(scan_date)
     85             logging.info("Forbid scan date {}. There is no files to get.".format(scan_date))
     86             return False, scan_node, cur_offset
     87 
     88     def schedule_date_ctl(self):
     89         """
     90         日期调度,优先调度当天。
     91         空闲调度:当前系统时间连续调度 FREE_TIMES 次,回溯调度一次
     92         :return:date:%Y%m%d
     93         """
     94         if self._ctrl_info['counter'] >= FREE_TIMES:
     95             self._ctrl_info['counter'] = 0
     96             all_date = self.progress_manager.get().keys()
     97             if len(all_date) > 1:
     98                 all_date.sort(reverse=True)
     99                 today_format = self.today.strftime('%Y%m%d')
    100                 backtrack_index = all_date.index(today_format) + 1 if today_format in all_date else 0
    101                 for i in range(backtrack_index, len(all_date)):
    102                     scan_node = self.progress_manager.init_scan_node(
    103                         self.progress_manager.get(all_date[i])["offset"]
    104                     )
    105                     if Utils.get_scan_val(
    106                             self.progress_manager.get(all_date[i])["range"],
    107                             scan_node
    108                     ) != FINISH_SCAN:
    109                         # 不等于0 就可以扫描调度,否则排除当前日期
    110                         return all_date[i]
    111                 #无文件休眠
    112                 time.sleep(SLEEP_TIME)
    113         #非调度,时间为当天时
    114         # 此处与扫描时间保持一致 - delay秒,防止跨天进度保存bug
    115         return (self.today - timedelta(seconds=self.delay)).strftime("%Y%m%d")
    116 
    117     def _get_scan_time_range(self, index, offset):
    118         """
    119         获取扫描时间范围
    120         :return:
    121         """
    122         return self.change_index_to_time_range(index, offset)
    123 
    124     def change_index_to_time_range(self, index, offset):
    125         """
    126         根据扫描索引转化为扫描时间
    127         :param index: 扫描点
    128         :return: 起始时间,结束时间
    129         """
    130         start = index * offset
    131         end = (index + 1) * offset
    132         return [int(self.format_sec(start)), int(self.format_sec(end))]
    133 
    134     def format_sec(self, total):
    135         """
    136         格式化文件时间表达式
    137         :param total:
    138         :return:
    139         """
    140         total = total if total < BASE_SCAN_NODE else BASE_SCAN_NODE
    141         return "{}{}{}".format(
    142             str(total // 3600).zfill(2),
    143             str(total % 3600 // 60).zfill(2),
    144             str(total % 3600 % 60).zfill(2)
    145         )
    146 
    147     def select_file(self, scan_date, start_time, end_time):
    148         pattern = os.path.join(TCP_FLOW_APTH, scan_date, '*')
    149         file_list_of_dev = {}
    150         dev_path_list = [dp for dp in glob.glob(pattern) if os.path.isdir(dp)]
    151         for dev_path in dev_path_list:
    152             dev_name = "{}_{}".format(os.path.basename(dev_path),scan_date)
    153             file_list_of_dev[dev_name] = []
    154             file_pattern = os.path.join(dev_path, '*.avro')
    155             file_gen = (fl for fl in glob.glob(file_pattern))
    156             for fl in file_gen:
    157                 fl_time = int(os.path.basename(fl).split("_")[0])
    158                 if start_time <= fl_time < end_time:
    159                     file_list_of_dev[dev_name].append(fl)
    160         return file_list_of_dev
    161 
    162     def init_scan_control(self,span):
    163         progress = {}
    164         progress['counter'] = 0
    165         progress['offset'] = span
    166         return progress
    167 
    168     def save_progress(self, scan_date, index):
    169         """
    170         保存扫描进度
    171         :param scan_date: 日期
    172         :param index:扫描点
    173         :return:
    174         """
    175         self.progress_manager.save_scan_info(scan_date, index)
    176 
    177     def set_scan_finished(self, scan_date):
    178         """
    179         设定扫描完成
    180         :param scan_date:
    181         :return:
    182         """
    183         self.progress_manager.set_scan_finished(scan_date)
    184 
    185 
    186 if __name__ == '__main__':
    187     ta = TcpAvroScanner(57)
    188     while True:
    189         ta.get_new_files()
    190                         
    View Code

    2.进度控制

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import os
    import re
    import copy
    from datetime import date
    
    from cfgopr import CfgOpr
    
    from utils import Utils
    from netflowmeta_config import NOT_SCAN
    from netflowmeta_config import FINISH_SCAN
    from netflowmeta_config import BASE_SCAN_NODE
    from netflowmeta_config import DATE_PROGRESS_FILE
    
    
    class BaseManager(object):
    
        def __init__(self, path):
            self.path = path
    
    
    class DateManager(BaseManager):
    
    
        def __init__(self, path, span=60):
            super(DateManager, self).__init__(path)
            self.offset = span
            self.__progress = None
            self.init_progress()
    
        def init_progress(self):
            """
            日期进度初始化
            扫描进度初始化
            :return:
            """
            if not self.__progress:
                self.get_all_progress()
            else:
                self.flush_progress()
    
        def get_all_progress(self, pattern='^d{8}$'):
            """
            获取所有key下的进度
            :param key: 日志类型.文件类型
            :param path: 日志类型目录
            :param span: 默认获取跨度
            :param pattern: 日期正则模式
            :return: True
            """
            all_progress = {}
            date_path = [
                (os.path.join(self.path, tar_date, DATE_PROGRESS_FILE),tar_date) 
                for tar_date in os.listdir(self.path) 
                if (re.match(pattern, tar_date) and os.path.isdir(os.path.join(self.path, tar_date)))
            ]
            for item in date_path:
                _, dp_data = CfgOpr.json_load(item[0])
                if dp_data:
                    dp_data['range'] = Utils.base32to10(dp_data['range'])
                    all_progress[item[1]] = dp_data
                else:
                    dp_data['range'] = Utils.set_scan_val(
                        NOT_SCAN,
                        self.init_scan_node(self.offset)
                    )
                    dp_data['offset'] = self.offset
                    all_progress[item[1]] = dp_data
            cur_date = date.today().strftime("%Y%m%d")
            if cur_date not in all_progress:
                all_progress[cur_date] = {}
                all_progress[cur_date]['range'] = Utils.set_scan_val(
                        NOT_SCAN,
                        self.init_scan_node(self.offset)
                    )
                all_progress[cur_date]['offset'] = self.offset
            self.__progress = all_progress
            return True
    
        def flush_progress(self, pattern='^d{8}$'):
            """
            刷新进度,防止数据目录淘汰后异常,或跨天新增目录,未纳入管理
            :param key: 日志名.文件名
            :param path: 进度文件目录(日期目录同级)
            :param span: 文件获取时间跨度
            :param pattern: 文件夹正则表达式
            :return: True
            """
            all_exist_date = [
                tar_date for tar_date in os.listdir(self.path) 
                if (re.match(pattern, tar_date) and
                    os.path.isdir(os.path.join(self.path, tar_date))
                    )
            ]
            new_progress = {}
            for old_date in self.__progress.keys():
                if old_date in all_exist_date:
                    new_progress[old_date] = self.__progress[old_date]
            for exist_date in all_exist_date:
                if exist_date not in new_progress:
                    new_progress[exist_date] = {}
                    new_progress[exist_date]['range'] = Utils.set_scan_val(
                        NOT_SCAN,
                        self.init_scan_node(self.offset)
                    )
                    new_progress[exist_date]['offset'] = self.offset
            self.__progress = new_progress
            return True
    
        def set_scan_finished(self, scan_date):
            """
            设定扫描完成
            :param key:日志类型.文件类型
            :param path: 保存路径
            :param scan_date: 扫描日期
            :return: True or False
            """
            range_val = self.__progress[scan_date]["range"]
            scan_node = self.init_scan_node(
                self.__progress[scan_date]["offset"]
            )
            self.__progress[scan_date]["range"] = 
                Utils.set_scan_val(range_val, scan_node, FINISH_SCAN)
            save_data = copy.deepcopy(self.__progress[scan_date])
            save_data["range"] = Utils.base10to32(save_data["range"])
            progress_path = os.path.join(self.path, scan_date, DATE_PROGRESS_FILE)
            return CfgOpr.json_dump(save_data, progress_path)
    
        def save_scan_info(self, scan_date, index):
            """
            保存扫描信息
            :param key:日志类型.文件类型
            :param path: 保存路径
            :param scan_date: 扫描日期
            :param index: 扫描点
            :return: True or False
            """
            self.__progress[scan_date]["range"] = 
                Utils.set_scan_val(
                    self.__progress[scan_date]["range"],
                    index
                )
            save_data = copy.deepcopy(self.__progress[scan_date])
            save_data["range"] = Utils.base10to32(save_data["range"])
            progress_path = os.path.join(self.path, scan_date, DATE_PROGRESS_FILE)
            return CfgOpr.json_dump(save_data, progress_path)
    
        def init_scan_node(self, span):
            """
            初始化扫描节点
            :param span:文件获取时间跨度
            :return: 当前扫描节点
            """
            return BASE_SCAN_NODE // int(span) + (1 if BASE_SCAN_NODE % int(span) else 0)
        
        def get(self, scan_date=None):
            return self.__progress if scan_date == None else self.__progress.get(scan_date, {})
    View Code

    3.配置

    BASE_SCAN_NODE = 86400
    
    FINISH_SCAN = 0
    NOT_SCAN = 0
    
    
    FREE_TIMES = 3
    
    SLEEP_TIME = 1
  • 相关阅读:
    scikit-learn随机森林调参小结
    用Spark学习FP Tree算法和PrefixSpan算法
    典型关联分析(CCA)原理总结
    scikit-learn Adaboost类库使用小结
    Google maps API开发(二)(转)
    php中setcookie函数用法详解(转)
    关于中英数字混排的字符串分割问题(转)
    字符串截取函数
    jQuery Masonry构建pinterest网站布局注意要点(转)
    【jQuery插件】用jQuery Masonry快速构建一个pinterest网站布局(转)
  • 原文地址:https://www.cnblogs.com/shiqi17/p/13417548.html
Copyright © 2011-2022 走看看