zoukankan      html  css  js  c++  java
  • Celery---一个懂得异步任务,延时任务,周期任务的芹菜

    Celery是什么?

    celey是芹菜

    celery是基于Python实现的模块,用于执行异步延时周期任务的

    其结构组成是由

      1.用户任务 app

      2.管道任务broker用于存储任务 官方推荐redis rabbitMQ /backend 用于存储任务执行结果的

      3.员工 worker

    Celery的简单示例

    from celery import Celery
    import time
    
    #创建一个Celery实例,这就是我们用户的应用app
    my_task = Celery("tasks", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")
    
    # 为应用创建任务,func1
    @my_task.task
    def func1(x, y):
        time.sleep(15)
        return x + y
    s1.py
    from s1 import func1
    
    # 将任务交给Celery的Worker执行
    res = func1.delay(2,4)
    
    #返回任务ID
    print(res.id)
    
    s2.py
    s2.py
    from celery.result import AsyncResult
    from s1 import my_task
    
    # 异步获取任务返回值
    async_task = AsyncResult(id="31ec65e8-3995-4ee1-b3a8-1528400afd5a",app=my_task)
    
    # 判断异步任务是否执行成功
    if async_task.successful():
        #获取异步任务的返回值
        result = async_task.get()
        print(result)
    else:
        print("任务还未执行完成")
    s3.py

    Celery的启动

    根据操作系统的不同,启动方式也存在差异:
    Linux - celery worker -A s1 -l INFO 
    Windows:这里需要注意的是celery 4.0 已经不再对Windows操作系统提供支持了,也就是在windows环境下出现问题除非自己解决,否贼官方是不会给你解决的
    Windows - celery worker -A s1 -l INFO -P eventlet
    ps: eventlet 是一个python的三方库 需要使用 pip安装 pip install eventlet

    项目目录

    在实际项目中我们应用Celery是有规则的

    要满足这样的条件才可以哦,目录Celery_task这个名字可以随意起,但是一定要注意在这个目录下一定要有一个celery.py这个文件

    from celery import Celery
    
    celery_task = Celery("task",
                         broker="redis://127.0.0.1:6379",
                         backend="redis://127.0.0.1:6379",
                         include=["Celery_task.task_one","Celery_task.task_two"])
    # include 这个参数适用于寻找目录中所有的task
    celery.py
    from .celery import celery_task
    import time
    
    @celery_task.task
    def one(x,y):
        time.sleep(5)
        return f"task_one {x+y}"
    task_one
    from .celery import celery_task
    import time
    
    @celery_task.task
    def two(x,y):
        time.sleep(5)
        return f"task_two {x+y}"
    task_two

    这样Celery项目目录结构就已经做好了然后再 my_celery中调用

    from Celery_task.task_one import one
    from Celery_task.task_two import two
     
    one.delay(10,10)
    two.delay(20,20)
    View Code

    PS:启动Worker的时候无需再使用文件启动,直接启动你的Celery_task目录就行了
    celery worker -A Celery_task -l INFO -P eventlet
    这样celery就可以自动的去检索当前目录下所有的task了,通过Include这个参数逐一去寻找

    Celery延时任务

     1 from Celery_task.task_one import one
     2 from Celery_task.task_two import two
     3 
     4 # one.delay(10,10)
     5 # two.delay(20,20)
     6 
     7 # 定时任务我们不在使用delay这个方法了,delay是立即交给task 去执行
     8 # 现在我们使用apply_async定时执行
     9 
    10 #首先我们要先给task一个执行任务的时间
    11 import datetime,time
    12 # 获取当前时间 此时间为东八区时间
    13 ctime = time.time()
    14 # 将当前的东八区时间改为 UTC时间 注意这里一定是UTC时间,没有其他说法
    15 utc_time = datetime.datetime.utcfromtimestamp(ctime)
    16 # 为当前时间增加 10 秒
    17 add_time = datetime.timedelta(seconds=10)
    18 action_time = utc_time + add_time
    19 
    20 # action_time 就是当前时间未来10秒之后的时间
    21 #现在我们使用apply_async定时执行
    22 res = one.apply_async(args=(10,10),eta=action_time)
    23 print(res.id)
    24 #这样原本延迟5秒执行的One函数现在就要在10秒钟以后执行了
    my_celery

    定时任务只能被执行一次,那如果我想每隔10秒都去执行一次这个任务怎么办呢? 周期任务来了

    Celery周期任务

     1 from celery import Celery
     2 from celery.schedules import crontab
     3 
     4 celery_task = Celery("task",
     5                      broker="redis://127.0.0.1:6379",
     6                      backend="redis://127.0.0.1:6379",
     7                      include=["Celery_task.task_one","Celery_task.task_two"])
     8 
     9 #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
    10 celery_task.conf.beat_schedule={
    11     "each10s_task":{
    12         "task":"Celery_task.task_one.one",
    13         "schedule":10, # 每10秒钟执行一次
    14         "args":(10,10)
    15     },
    16     "each1m_task": {
    17         "task": "Celery_task.task_one.one",
    18         "schedule": crontab(minute=1), # 每一分钟执行一次
    19         "args": (10, 10)
    20     },
    21     "each24hours_task": {
    22         "task": "Celery_task.task_one.one",
    23         "schedule": crontab(hour=24), # 每24小时执行一次
    24         "args": (10, 10)
    25     }
    26 
    27 }
    28 
    29 #以上配置完成之后,还有一点非常重要
    30 # 不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方
    31 # celery beat -A Celery_task
    32 # celery worker -A Celery_task -l INFO -P eventlet
    celery.py

    创建Worker的方式并没有发行变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给Worker去执行,这里需要一个生产者beat

    celery beat -A Celery_task  #创建生产者 beat 你的 schedule 写在哪里,就要从哪里启动

     celery worker -A Celery_task -l INFO -P eventlet

     创建worker之后,每10秒就会由beat创建一个任务给Worker去执行

    到此为止 Celery的应用就已经完事儿了,Bye

  • 相关阅读:
    wget(转)
    852. Peak Index in a Mountain Array
    617. Merge Two Binary Trees
    814. Binary Tree Pruning
    657. Judge Route Circle
    861. Score After Flipping Matrix
    832. Flipping an Image
    461. Hamming Distance
    654. Maximum Binary Tree
    804. Unique Morse Code Words
  • 原文地址:https://www.cnblogs.com/xinjie123/p/11010387.html
Copyright © 2011-2022 走看看