zoukankan      html  css  js  c++  java
  • Python定时任务

    import threading
    import time
    import datetime

    def exec_update():
        time.sleep(5)

    def event_func():
        now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(now_time)
        exec_update()
        t = threading.Timer(1, event_func)
        t.setDaemon(True)
        t.start()

    if __name__ == '__main__':
        t = threading.Timer(0, event_func)
        t.setDaemon(True)
        t.start()
        while 1:
            pass

    ########################################################################################################

    """

    定时任务,某个时间点执行

    """
    import os
    import time
    import json
    import sys
    import sched
    import signal
    import datetime
    import threading
    import subprocess
    from subprocess import DEVNULL
    import service.db_conn as db_conn

    #具体任务
    def exec_update():

        print("update...")


    #定时器
    def schedule_update():
        t = threading.Timer(0, event_func)
        t.setDaemon(True)
        t.start()

    #执行函数
    def event_func():
        now_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        print(now_time)
        exec_update()
        interval_time = delay_time()
        t = threading.Timer(interval_time, event_func)
        t.setDaemon(True)
        t.start()

    #取时间点
    def delay_time():
        # now time
        now_time = datetime.datetime.now()
        # tomorrow time
        next_time = now_time + datetime.timedelta(days=+1)
        next_year = next_time.date().year
        next_month = next_time.date().month
        next_day = next_time.date().day
        # get tomorrow 00:00
        next_time = datetime.datetime.strptime(str(next_year)+"-"+str(next_month)+"-"+str(next_day)+" 00:00:00", "%Y-%m-%d %H:%M:%S")

        # get secondes
        delay_time = (next_time - now_time).total_seconds()
        return delay_time

    def quit_sys(signum, frame):
        sys.exit()

    #接收Ctrl+C信号
    if __name__ == "__main__":
        try:
            signal.signal(signal.SIGINT, quit_sys)
            signal.signal(signal.SIGTERM, quit_sys)
            schedule_update()
            print("Hit Ctrl-C to quit. ")
            while 1:
                time.sleep(1)
        except  Exception as e:
            print(e)

  • 相关阅读:
    DataSnap(MIDAS)三层架构中,常用事件及其触发顺序
    如何让中间层MIDAS/DATASNAP支持大量的并发用户并且控制连接数量
    插件之注册插件和注册插件中的模块
    TField.ProviderFlags
    datasnap生命期LifeCycle
    datasnap服务器支持的参数类型
    TDSAuthenticationManager的用法
    泛型实现的对象池
    服务器端如何防止DDOS
    获得客户端的信息
  • 原文地址:https://www.cnblogs.com/frisk/p/9463196.html
Copyright © 2011-2022 走看看