zoukankan      html  css  js  c++  java
  • python 解析 crontab

    
    

    1. 使用croniter

    官方教程是:https://github.com/taichino/croniter

    示例一:获得下次crontab执行的时间

      

    from croniter import croniter
    from datetime import datetime
    print datetime.now()
    
    cron = croniter('01 */5 * * * *', datetime.now())  
    print(cron.get_next(datetime))
    
    
    输出结果是:
    
    2021-03-12 13:52:10.627000
    2021-03-12 15:01:00

    示例二:计算当前一段时间以后的 crontab执行时间

      直接上代码

      

    from croniter import croniter_range
    from datetime import datetime,timedelta
    tomorrow = datetime.strptime((datetime.now() + timedelta(days=2)).strftime('%Y-%m-%d %H:%M:%S'), "%Y-%m-%d %H:%M:%S")
    print "tomorrow",tomorrow
    print type(tomorrow)
    for run_time in croniter_range(datetime.now(), tomorrow, "01 */2 * * *"):
        print(run_time)

      输入结果:

      

    C:Python27python2.exe F:/SpeedCrawlerEnd/crawler_algorithm/rwdd/apscheduler_test.py
    tomorrow 2021-03-14 13:57:17
    <type 'datetime.datetime'>
    2021-03-12 14:01:00
    2021-03-12 16:01:00
    2021-03-12 18:01:00
    2021-03-12 20:01:00
    2021-03-12 22:01:00
    2021-03-13 00:01:00
    2021-03-13 02:01:00
    2021-03-13 04:01:00
    2021-03-13 06:01:00
    2021-03-13 08:01:00
    2021-03-13 10:01:00
    2021-03-13 12:01:00
    2021-03-13 14:01:00
    2021-03-13 16:01:00
    2021-03-13 18:01:00
    2021-03-13 20:01:00
    2021-03-13 22:01:00
    2021-03-14 00:01:00
    2021-03-14 02:01:00
    2021-03-14 04:01:00
    2021-03-14 06:01:00
    2021-03-14 08:01:00
    2021-03-14 10:01:00
    2021-03-14 12:01:00

    示例三:直接上代码

       

    代码一:
    
    
    from datetime import datetime
    import time
    import croniter
    
    
    def run_get_next_time(sched):
        datetime_now = datetime.now()
        print [croniter.croniter(sched, datetime_now).get_next(datetime).strftime("%Y-%m-%d %H:%M:%S") for page in range(10)]
    
    
    
    代码二:
    
    
    def run_get_next_time(sched):
        datetime_now = datetime.now()
        for page in range(10):
            cron = croniter.croniter(sched, datetime_now)
            datetime_now = cron.get_next(datetime)
                # .strftime("%Y-%m-%d %H:%M:%S")
            print datetime_now
            print type(datetime_now)
            # time.sleep(2222)
    sched = "10 */2 * * * "

    使用github 开源  crontab_parser

    直接上代码,新建python文件,文件名 crontab_parser  ,crontab_parser文件代码如下

        

    # -*- coding: utf-8 -*-
    
    #
    # import croniter
    # from datetime import datetime
    #
    # import croniter
    # import datetime
    #
    #
    # def run_get_next_time(sched):
    #     cron = croniter.croniter(sched, datetime.datetime.now())
    #     print "cron", cron.get_next(ret_type=10)
    #
    #     return cron.get_next(ret_type=10).strftime("%Y-%m-%d %H:%M")
    #
    #
    # # for page in range(10):
    # #     print(run_get_next_time("10 */2 * * * "))
    #
    #
    # from crontab import CronTab
    # from datetime import datetime
    #
    # # define the crontab for 25 minutes past the hour every hour
    # entry = CronTab('25 */2 * * *')
    # # find the delay from when this was run (around 11:13AM)
    # print entry.next(default_utc=False)
    # import time
    #
    # time.sleep(60)
    
    import re
    import datetime
    
    
    class SimpleCrontabEntry(object):
        """Contrab-like parser.
        Only deals with the first 5 fields of a normal crontab
        entry."""
    
        def __init__(self, entry, expiration=0):
            self.__setup_timespec()
            self.set_value(entry)
            self.set_expiration(expiration)
    
        def set_expiration(self, val):
            self.expiration = datetime.timedelta(minutes=val)
    
        def set_value(self, entry):
            self.data = entry
            fields = re.findall("S+", self.data)
            if len(fields) != 5:
                raise ValueError("Crontab entry needs 5 fields")
            self.fields = {
                "minute": fields[0],
                "hour": fields[1],
                "day": fields[2],
                "month": fields[3],
                "weekday": fields[4],
            }
            if not self._is_valid():
                raise ValueError("Bad Entry")
    
        #### HERE BEGINS THE CODE BORROWED FROM gnome-schedule ###
        def __setup_timespec(self):
    
            self.special = {
                "@reboot": '',
                "@hourly": '0 * * * *',
                "@daily": '0 0 * * *',
                "@weekly": '0 0 * * 0',
                "@monthly": '0 0 1 * *',
                "@yearly": '0 0 1 1 *'
            }
    
            self.timeranges = {
                "minute": range(0, 60),
                "hour": range(0, 24),
                "day": range(1, 32),
                "month": range(1, 13),
                "weekday": range(0, 8)
            }
    
            self.timenames = {
                "minute": "Minute",
                "hour": "Hour",
                "day": "Day of Month",
                "month": "Month",
                "weekday": "Weekday"
            }
    
            self.monthnames = {
                "1": "Jan",
                "2": "Feb",
                "3": "Mar",
                "4": "Apr",
                "5": "May",
                "6": "Jun",
                "7": "Jul",
                "8": "Aug",
                "9": "Sep",
                "10": "Oct",
                "11": "Nov",
                "12": "Dec"
            }
    
            self.downames = {
                "0": "Sun",
                "1": "Mon",
                "2": "Tue",
                "3": "Wed",
                "4": "Thu",
                "5": "Fri",
                "6": "Sat",
                "7": "Sun"
            }
    
        def checkfield(self, expr, type):
            """Verifies format of Crontab timefields
            Checks a single Crontab time expression.
            At first possibly contained alias names will be replaced by their
            corresponding numbers. After that every asterisk will be replaced by
            a "first to last" expression. Then the expression will be splitted
            into the komma separated subexpressions.
            Each subexpression will run through:
            1. Check for stepwidth in range (if it has one)
            2. Check for validness of range-expression (if it is one)
            3. If it is no range: Check for simple numeric
            4. If it is numeric: Check if it's in range
            If one of this checks failed, an exception is raised. Otherwise it will
            do nothing. Therefore this function should be used with
            a try/except construct.
            """
    
            timerange = self.timeranges[type]
    
            # Replace alias names only if no leading and following alphanumeric and
            # no leading slash is present. Otherwise terms like "JanJan" or
            # "1Feb" would give a valid check. Values after a slash are stepwidths
            # and shouldn't have an alias.
            if type == "month":
                alias = self.monthnames.copy()
            elif type == "weekday":
                alias = self.downames.copy()
            else:
                alias = None
            if alias != None:
                while True:
                    try:
                        key, value = alias.popitem()
                    except KeyError:
                        break
                    expr = re.sub("(?<!w|/)" + value + "(?!w)", key, expr)
    
            expr = expr.replace("*", str(min(timerange)) + "-" + str(max(timerange)))
    
            lst = expr.split(",")
            rexp_step = re.compile("^(d+-d+)/(d+)$")
            rexp_range = re.compile("^(d+)-(d+)$")
    
            expr_range = []
            for field in lst:
                # Extra variables for time calculation
                step = None
                buff = None
    
                result = rexp_step.match(field)
                if result != None:
                    field = result.groups()[0]
                    # We need to take step in count
                    step = int(result.groups()[1])
                    if step not in timerange:
                        raise ValueError("stepwidth",
                                         self.timenames[type],
                                         "Must be between %(min)s and %(max)s" % {"min": min(timerange),
                                                                                  "max": max(timerange)})
    
                result = rexp_range.match(field)
                if (result != None):
                    if (int(result.groups()[0]) not in timerange) or (int(result.groups()[1]) not in timerange):
                        raise ValueError("range",
                                         self.timenames[type],
                                         "Must be between %(min)s and %(max)s" % {"min": min(timerange),
                                                                                  "max": max(timerange)})
                    # Now we deal with a range...
                    if step != None:
                        buff = range(int(result.groups()[0]), int(result.groups()[1]) + 1, step)
                    else:
                        buff = range(int(result.groups()[0]), int(result.groups()[1]) + 1)
    
                elif not field.isdigit():
                    raise ValueError("fixed",
                                     self.timenames[type],
                                     "%s is not a number" % (field))
                elif int(field) not in timerange:
                    raise ValueError("fixed",
                                     self.timenames[type],
                                     "Must be between %(min)s and %(max)s" % {"min": min(timerange),
                                                                              "max": max(timerange)})
                if buff != None:
                    expr_range.extend(buff)
                else:
                    expr_range.append(int(field))
    
            expr_range.sort()
            # Here we may need to check wether some elements have duplicates
            self.fields[type] = expr_range
    
        #### HERE ENDS THE CODE BORROWED FROM gnome-schedule ###
    
        def _is_valid(self):
            """Validates the data to check for a well-formated cron
            entry.
            Returns True or false"""
    
            try:
                for typ, exp in self.fields.items():
                    self.checkfield(exp, typ)
            except ValueError, (specific, caused, explanation):
                print "PROBLEM TYPE: %s, ON FIELD: %s -> %s " % (specific, caused, explanation)
                return False
            return True
    
        def __next_time(self, time_list, time_now):
            """Little helper function to find next element on the list"""
            tmp = [x for x in time_list if x >= time_now]
            carry = False
            if len(tmp) == 0:
                carry = True
                sol = time_list[0]
            else:
                sol = tmp[0]
            return sol, carry
    
        def __prev_time(self, time_list, item):
            """Little helper function to find previous element on the list"""
            pos = time_list.index(item)
            elem = time_list[pos - 1]
            carry = elem >= time_list[pos]
            return elem, carry
    
        def __next_month(self, month, sol):
            """Find next month of execution given the month arg. If month
            is different than current calls all the other __next_*
            functions to set up the time."""
    
            sol['month'], carry = self.__next_time(self.fields['month'], month)
            if carry:
                sol['year'] += 1
            if sol['month'] != month:
                self.__next_day(1, sol)
                self.__next_hour(0, sol)
                self.__next_minute(0, sol)
                return False
            return True
    
        def __next_minute(self, minute, sol):
            """Find next minute of execution given the minute arg."""
            sol['minute'], carry = self.__next_time(self.fields['minute'], minute)
            if carry:
                self.__next_hour(sol['hour'] + 1, sol)
            return True
    
        def __next_hour(self, hour, sol):
            """Find next hour of execution given the hour arg. If hour is
            different than current calls the __next_hour function to set
            up the minute """
    
            sol['hour'], carry = self.__next_time(self.fields['hour'], hour)
            if carry:
                self.__next_day(sol['day'] + 1, sol)
            if sol['hour'] != hour:
                self.__next_minute(0, sol)
                return False
            return True
    
        # el weekday se calcula a partir del dia, el mes y ao dentro de sol
        def __next_day(self, day, sol):
            """Find next day of execution given the day and the month/year
            information held on the sol arg. If day is different than
            current calls __next_hour and __next_minute functions to set
            them to the correct values"""
    
            try:
                now = datetime.date(sol['year'], sol['month'], day)
            except:
                try:
                    now = datetime.date(sol['year'], sol['month'] + 1, 1)
                except:
                    now = datetime.date(sol['year'] + 1, 1, 1)
            # The way is handled on the system is monday = 0, but for crontab sunday =0
            weekday = now.weekday() + 1
            # first calculate day
            day_tmp, day_carry = self.__next_time(self.fields['day'], day)
            day_diff = datetime.date(sol['year'], sol['month'], day_tmp) - now
    
            # if we have all days but we don't have all weekdays we need to
            # perform different
            if len(self.fields['day']) == 31 and len(self.fields['weekday']) != 8:
                weekday_tmp, weekday_carry = self.__next_time(self.fields['weekday'], weekday)
                # Both 0 and 7 represent sunday
                weekday_tmp -= 1
                if weekday_tmp < 0: weekday_tmp = 6
                weekday_diff = datetime.timedelta(days=weekday_tmp - (weekday - 1))
                if weekday_carry:
                    weekday_diff += datetime.timedelta(weeks=1)
                weekday_next_month = (now + weekday_diff).month != now.month
                # If next weekday is not on the next month
                if not weekday_next_month:
                    sol['day'] = (now + weekday_diff).day
                    if sol['day'] != day:
                        self.__next_hour(0, sol)
                        self.__next_minute(0, sol)
                        return False
                    return True
                else:
                    flag = self.__next_month(sol['month'] + 1, sol)
                    if flag:
                        return self.__next_day(0, sol)
                    return False
    
            # if we don't have all the weekdays means that we need to use
            # them to calculate next day
            if len(self.fields['weekday']) != 8:
                weekday_tmp, weekday_carry = self.__next_time(self.fields['weekday'], weekday)
                # Both 0 and 7 represent sunday
                weekday_tmp -= 1
                if weekday_tmp < 0: weekday_tmp = 6
                weekday_diff = datetime.timedelta(days=weekday_tmp - (weekday - 1))
                if weekday_carry:
                    weekday_diff += datetime.timedelta(weeks=1)
                weekday_next_month = (now + weekday_diff).month != now.month
                # If next weekday is not on the next month
                if not weekday_next_month:
                    #  If the next day is on other month, the next weekday
                    #  is closer to happen so is what we choose
                    if day_carry:
                        sol['day'] = (now + weekday_diff).day
                        if sol['day'] != day:
                            self.__next_hour(0, sol)
                            self.__next_minute(0, sol)
                            return False
                        return True
                    else:
                        # Both day and weekday are good candidates, let's
                        # find out who is going to happen
                        # sooner
                        diff = min(day_diff, weekday_diff)
                        sol['day'] = (now + diff).day
                        if sol['day'] != day:
                            self.__next_hour(0, sol)
                            self.__next_minute(0, sol)
                            return False
                        return True
    
            sol['day'] = day_tmp
            if day_carry:
                self.__next_month(sol['month'] + 1, sol)
            if sol['day'] != day:
                self.__next_hour(0, sol)
                self.__next_minute(0, sol)
                return False
            return True
    
        def matches(self, time=datetime.datetime.now()):
            """Checks if given time matches cron pattern."""
            return time.month in self.fields['month'] and 
                   time.day in self.fields['day'] and 
                   time.hour in self.fields['hour'] and 
                   time.minute in self.fields['minute'] and 
                   time.weekday() + 1 in [d or 7 for d in
                                          self.fields['weekday']]  # Sunday may be represented as ``0`` or ``7``.
    
        def next_run(self, time=datetime.datetime.now()):
            """Calculates when will the next execution be."""
            if self.matches(time):
                time += datetime.timedelta(minutes=1)
            sol = {'minute': time.minute, 'hour': time.hour, 'day': time.day, 'month': time.month, 'year': time.year}
            # next_month if calculated first as next_day depends on
            # it. Also if next_month is different than time.month the
            # function will set up the rest of the fields
            try:
                self.__next_month(time.month, sol) and 
                self.__next_day(time.day, sol) and 
                self.__next_hour(time.hour, sol) and 
                self.__next_minute(time.minute, sol)
                return datetime.datetime(sol['year'], sol['month'], sol['day'], sol['hour'], sol['minute'])
            except:
                try:
                    return self.next_run(datetime.datetime(time.year, time.month + 1, 1, 0, 0))
                except:
                    return self.next_run(datetime.datetime(time.year + 1, 1, 1, 0, 0))

    测试代码如下:

        

    from datetime import datetime
    
    from crontab_parser import SimpleCrontabEntry
    cron = SimpleCrontabEntry('03 */2 * * *')
    datetime_time = datetime.now()
    for page in range(10):
        datetime_time = cron.next_run(datetime_time)
        print "print",datetime_time
    

      

    输入结果如下:

      

    datetime_time 2021-03-12 14:00:39.545000
    2021-03-12 14:03:00
    2021-03-12 16:03:00
    2021-03-12 18:03:00
    2021-03-12 20:03:00
    2021-03-12 22:03:00
    2021-03-13 00:03:00
    2021-03-13 02:03:00
    2021-03-13 04:03:00
    2021-03-13 06:03:00
    2021-03-13 08:03:00
    

      

    计算crontab有什么作用,用处大了去了

     
    如果觉得对您有帮助,麻烦您点一下推荐,谢谢!



    好记忆不如烂笔头
  • 相关阅读:
    Google 开源的 Python 命令行库:深入 fire(二)
    开启 Django 博客的 RSS 功能
    MongoDB 分片键的选择与案例
    Log4Net写入到数据库配置过程中的一些小问题备忘
    《WCF服务编程第三版》知识点摘录
    Android调用基于.net的WebService
    心跳包实现的另一种机制
    无法加载一个或多个请求的类型。有关更多信息,请检索 LoaderExceptions 属性。
    解决SaveChanges会Hold住之前的错误的问题
    memcached工作原理与优化建议
  • 原文地址:https://www.cnblogs.com/xuchunlin/p/14523531.html
Copyright © 2011-2022 走看看