zoukankan      html  css  js  c++  java
  • python定时任务APScheduler

    老大想在企业微信中添加一个机器人提醒大家下班写日报

    1、首先是你要添加一个机器人
    获取到webhook这个有什么用,看官方文档https://work.weixin.qq.com/help?doc_id=13376
    2、目前来看机器人只能推送消息(比如推送天气、提醒)
    3、我想定时推送消息,查到了APScheduler这个框架
    import requests
    
    import os
    import datetime, time
    from apscheduler.triggers.combining import AndTrigger, OrTrigger
    from apscheduler.triggers.interval import IntervalTrigger
    from apscheduler.triggers.cron import CronTrigger
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.schedulers.background import BackgroundScheduler
    
    scheduler = BlockingScheduler()
    headers = {'Content-Type': 'application/json'}
    
    data = {
        "msgtype": "text",
        "text": {
            "content": ""
        }
    }
    
    
    def send_weixin_shouquan3(data):
        webhook_url = "OOOOOO"
        res = requests.post(webhook_url, json=data, headers=headers)
        print(res.text)
    
    
    def get_tianqi():
        wheather_url = 'https://tianqiapi.com/api?XXXX'
        res1 = requests.get(wheather_url)
        print(res1.json())
        data1 = res1.json()
        datetime1 = '大家好,今天是%s %s' % (data1.get('date'), data1.get('week'))
        wheath = '西安今日天气: %s 天...不管什么天气都要记得多喝热水喔0.0' % (data1.get('wea'))
        data = {
            "msgtype": "text",
            "text": {
                "content": datetime1 + '
    ' + wheath
            }
        }
        send_weixin_shouquan3(data)
    
    
    def send_ribao():
        print('日报')
        data = {
            "msgtype": "text",
            "text": {
                "content": '下班前记得填写日报@所有人'
            }
        }
        send_weixin_shouquan3(data)
    
    
    def scheduled_job():
        print('This job is run every weekday at 5pm.')
    
    
    if __name__ == '__main__':
        sched = BlockingScheduler()
        sched.add_job(send_ribao, 'cron', day_of_week='mon-fri', hour='18', minute='00', second='00')  #周一到周五下午6点发送消息
        sched.add_job(get_tianqi, 'cron', day_of_week='mon-fri', hour='7')  # 早上七点
        print('before the start funciton')
        print('開始執行')
        sched.start()


    4、官方文档https://apscheduler.readthedocs.io/en/latest/modules/triggers/combining.html#module-apscheduler.triggers.combining
    组合使用有bug,记得看github上的issue
  • 相关阅读:
    整合Grafana
    Prometheus环境搭建
    RocketMQ单机部署
    记二进制搭建k8s集群完成后,部署时容器一直在创建中的问题
    接口重复提交解决方案
    记一次生产环境nginx图片上传不了的问题
    怎么进行中间件的学习
    MongoDB学习笔记之文档
    MongoDB学习笔记
    根据端口杀掉指定进程
  • 原文地址:https://www.cnblogs.com/tarzen213/p/13504524.html
Copyright © 2011-2022 走看看