zoukankan      html  css  js  c++  java
  • 每天一模块-apscheduler

    1. 调度对象封装
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    import sys,os,logging
    import asyncio
    import datetime
    from pytz import timezone
    from api_monitor.utils import scheduler_config
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.events import EVENT_ALL
     
     
     
     
    def default_func():
        pass
     
    class APIScheduler():
     
        def __init__(self,executors=scheduler_config.executors,
                     jobstores=scheduler_config.jobstores,
                     job_defaults=scheduler_config.job_defaults):
     
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
     
            self._sched = AsyncIOScheduler(executors=executors,
                                          jobstores=jobstores,
                                          job_defaults=job_defaults,
                                           timezone=timezone('Asia/Shanghai'))
            self.default_id = 'default'
            #设置scheduler默认为运行状态
            self._sched.state = 1
            # self._sched.add_listener(self.listener,EVENT_ALL)
            self._sched._logger = scheduler_config.logger()
     
            #解决job从redis获取后没有scheduler属性问题
            for jobstore in jobstores:
                jobstores[jobstore]._scheduler = self._sched
     
        def listener(self,event):
            if event.exception:
                print('CHUCUO')
            else:
                print('正常。。。')
     
     
        def add_job(self,**kwargs):
            """
            添加任务到数据库中
            :param kwargs:
             func – callable (or a textual reference to one) to run at the given time
             trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called
             args (list|tuple) – list of positional arguments to call func with
             kwargs (dict) – dict of keyword arguments to call func with
             id (str|unicode) – explicit identifier for the job (for modifying it later)
             name (str|unicode) – textual description of the job
             misfire_grace_time (int) – seconds after the designated runtime that the job is still allowed to be run
             coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession
             max_instances (int) – maximum number of concurrently running instances allowed for this job
             next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused)
             jobstore (str|unicode) – alias of the job store to store the job in
             executor (str|unicode) – alias of the executor to run the job with
             replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one)`
            :return:
            """
            #state=1时才会调用_real_add_job方法添加数据到数据库
     
            job = self._sched.add_job(**kwargs)
     
     
        def _add_default_job(self,job_id='default',jobstore='default'):
     
            self.add_job(
                func=default_func, trigger='interval', seconds=1,
                jobstore=jobstore,next_run_time=datetime.datetime.now(),
                id=job_id, replace_existing=True, misfire_grace_time=3, coalesce=True,
                max_instances=1
            )
     
     
        def remove_job(self,job_id,jobstore='default'):
            """
            根据job_id删除任务
            :param job_id:
            :param jobstore:
            :return:
            """
            self._sched.remove_job(job_id=job_id,jobstore=jobstore)
     
        def remove_all_jobs(self,jobstore='default',default_jobid='default'):
            """
            从jobstore内删除所有除job id为default的任务
            :param jobstore:
            :param default_jobid:
            :return:
            """
            valid_job_ids = (job.id for job in self._sched.get_jobs(jobstore=jobstore)
                             if job.id != default_jobid)
            try:
                for job_id in valid_job_ids:
                    self.remove_job(job_id,jobstore)
            except AttributeError as e:
                print(e.with_traceback())
     
        def pause_job(self,job_id,jobstore='default'):
            """
            根据job_id暂停任务
            :param job_id:
            :param jobstore:
            :return:
            """
            self._sched.pause_job(job_id=job_id,jobstore=jobstore)
     
        def resume_job(self,job_id,jobstore='default'):
            """
            根据job_id唤醒任务
            :param job_id:
            :param jobstore:
            :return:
            """
            self._sched.resume_job(job_id=job_id,jobstore=jobstore)
     
        def star(self,add_default_job=True,jobstore='default'):
            """
            开始处理任务
            :return:
            """
            try:
                if add_default_job:
                    self._add_default_job(jobstore=jobstore)
                self._sched.state = 0
                self._sched.start()
                loop = asyncio.get_event_loop()
                loop.run_forever()
            finally:
                self.shutdown()
     
        def shutdown(self,wait=True):
            self._sched.shutdown(wait=True)
            
     2. 调度任务存储配置
     #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    import logging
    from apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor
    from apscheduler.jobstores.redis import RedisJobStore
     
    executors = {
            'default':ThreadPoolExecutor(200),
            'processpool':ProcessPoolExecutor(5)
        }
     
    jobstores = {
        'default':RedisJobStore(db=0,
                                jobs_key='apscheduler.jobs',
                                run_times_key='apscheduler.run_times',
                                host='127.0.0.1',
                                port='6379',
                                password='',
        ),
    }
     
    job_defaults = {
        'coalesce': True,
        'max_instances': 3
    }
     
    def logger(log_file='/tmp/scheduler.log'):
        logging.basicConfig(level=logging.INFO,
                            format='%(asctime)s %(levelname)s %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S',
                            filename=log_file,
                            filemode='a')
        return logging
        
    3.任务函数
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
     
    import pycurl
    from urllib import parse
    from io import StringIO,BytesIO
    import json
    import os,sys
    import pymysql
    import time
     
     
     
    db_host = "xxxxxx"
    db_user = "xxxx"
    db_passwd = "xxxxxx"
    db_database = "xxxxxx"
    conn = pymysql.connect(db_host,db_user,db_passwd,db_database)
    cursor = conn.cursor()
     
    def curl(url,params,method,header=[],save_data=True,time_out=6,**kwargs):
     
        api_info = {
            'api_monitor_id': kwargs['api_id'],
            'api_monitor_node_id': kwargs['node_id'],
        }
     
        buffer = BytesIO()
        c = pycurl.Curl()
        c.setopt(pycurl.HTTPHEADER, header)
        c.setopt(pycurl.WRITEDATA, buffer)
        # 请求超时时间
        c.setopt(pycurl.TIMEOUT, time_out)
        if method.lower() == 'get':
            #url = f'{url}?{parse.urlencode(params)}'
            url = url + '?' + parse.urlencode(params)
        else:
            c.setopt(pycurl.POSTFIELDS, json.dumps(params))
        c.setopt(pycurl.URL,url)
        try:
            c.perform()
            api_info['err_message'] = 0
        except pycurl.error as e:
            api_info['err_message'] = str(e)
     
        finally:
            api_info.update(
                {
                    'http_code': c.getinfo(pycurl.HTTP_CODE),
                    'totle_response_time': c.getinfo(pycurl.TOTAL_TIME),
                    'dns_time': c.getinfo(pycurl.NAMELOOKUP_TIME),
                    'connect_time': c.getinfo(pycurl.CONNECT_TIME),
                    'redriect_time': c.getinfo(pycurl.REDIRECT_TIME),
                    'ssl_time': c.getinfo(pycurl.APPCONNECT_TIME),
                    'size_download': c.getinfo(pycurl.SIZE_DOWNLOAD),
                    'speed_down': c.getinfo(pycurl.SPEED_DOWNLOAD),
                    'content': buffer.getvalue().decode('utf-8'),
     
                }
            )
            if save_data:
                current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
                sql = """insert into api_monitor_apimonitorhistory (http_code, totle_response_time, dns_time, connect_time, redriect_time,ssl_time,size_download, speed_down, content, err_message, api_monitor_id,api_monitor_node_id, create_time)values({http_code},{totle_response_time},{dns_time},{connect_time},{redriect_time},{ssl_time},{size_download},{speed_down},'{content}',{err_message},{api_monitor_id},{api_monitor_node_id},'{current_time}')""".format(http_code=api_info.get('http_code'),totle_response_time=api_info.get('totle_response_time'),dns_time=api_info.get('dns_time'),connect_time=api_info.get('connect_time'),
                                                       redriect_time=api_info.get('redriect_time'),
                                                       ssl_time=api_info.get('ssl_time'),
                                                       size_download=api_info.get('size_download'),
                                                       speed_down=api_info.get('speed_down'),
                                                       content=api_info.get('content'),
                                                       err_message=api_info.get('err_message'),
                                                       api_monitor_id=api_info.get('api_monitor_id'),
                                                       api_monitor_node_id=api_info.get('api_monitor_node_id'),
                                                        current_time=current_time)
                print(sql)
                try:
                    cursor.execute(sql)
                       conn.commit()
                except Exception as e:
                    print(e)
                    conn.rollback()
     
                #ApiMonitorHistory.objects.create(**api_info)
            else:
                pass
            c.close()
     
     4.执行调度任务
         server端添加任务到redis
         job_obj = scheduler.APIScheduler()  # 调用job对象
         for node_id in api_monitor_node_id_list:
            api_conf = {
                "url":monitor_url,
                "params": params_data,
                "method": request_method,
                "header": http_header,
                "api_id": str(api_obj_id),
                "node_id": node_id
            }
     
         job_obj.add_job(id=str(node_id), func=tasks.curl, kwargs=api_conf,
                        trigger='interval',
                        seconds=5,next_run_time=datetime.datetime.now(),
                        replace_existing=True, misfire_grace_time=3, coalesce=True,
                        max_instances=2,
                        jobstore='default') # 加入任务到redis, 必须设置scheduler state=1才能加入redis
                        
         agent端监听任务
         from api_monitor.utils import scheduler
     
        job_obj = scheduler.APIScheduler()
        job_obj.star() # 启动监听
        #job_obj.remove_job(job_id="default")  # 根据任务id删除任务
        
        
    参考链接: https://zhuanlan.zhihu.com/p/44185271
    flask中的应用:https://www.cnblogs.com/zydev/p/13865535.html
    
    可以将flask-apscheduler当一个定时任务平台,平常将任务添加到其中
    
    https://www.cnblogs.com/shenh/p/13366583.html
    
    参考1:https://www.sohu.com/a/407444741_658944
    
    参考2:https://www.cnblogs.com/yueerwanwan0204/p/5480870.html
    
    参考3:https://www.jb51.net/article/184003.htm
    
    人生苦短,我用python!
  • 相关阅读:
    C#之枚举
    C#之判断字母大小、字母转ACII码
    C#之BF算法
    md5如何实现encodePassword加密方法
    基本配置及安全级别security-level
    js中“原生”map
    web.xml讲解
    java application指的是什么
    .conf、.bak是什么格式
    Maven系列--web.xml 配置详解
  • 原文地址:https://www.cnblogs.com/sunxiuwen/p/14656833.html
Copyright © 2011-2022 走看看