zoukankan      html  css  js  c++  java
  • celery异步,延时任务, 周期任务

      celery中文译为芹菜,是一个分布式任务队列. 是异步的,所以能处理大量消息

      最新的celery不支持windows下使用了,所以在使用pycharm安装celery模块之后,需要再安装eventlet模块才能测试运行.

    一.异步任务

    启动客户端:

    s1,s2要在项目目录下,如果在文件夹中执行,terminal输入命令的时候要-A 项目文件夹的名字

    c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1", include="项目名.文件夹")

    Terminal中输入

    celery worker -A s1 -l info -P eventlet

      给定两个文件

      s1.py

    from celery import Celery
    import time
    c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")
    
    @c.task
    def myfun1(a,b):
        return f"myfun1{a}{b}"
    
    @c.task
    def myfun2():
        return "myfun2"
    
    @c.task
    def myfun3():
        return "myfun3"
    
    celery worker -A s1 -l info -P eventlet

      s2.py

    from s1 import myfun1,myfun2,myfun3,c
    from celery.result import AsyncResult
    #多个生产者
    # for i in range(10):
    #     s=myfun1.delay()
    #     print(s)
    
    s=myfun1.delay(10,20)
    print(s.id)
    r=AsyncResult(id=s.id,app=c)
    
    #获取状态
    # print(r.status)
    # print(r.successful())
    
    #获取值
    # print(r.get())
    
    #只获取报错信息
    print(r.get(propagate=False))
    
    #获取具体出错的位置
    # print(r.traceback)

    二.延时任务/定时任务

      apply_async

    t=add.apply_async((1,2),countdown=5) #表示延迟5秒钟执行任务
    print(t)
    print(t.get())

      支持的参数

    countdown : 等待一段时间再执行.
    add.apply_async((2,3), countdown=5)
    eta : 定义任务的开始时间.这里的时间是UTC时间,这里有坑 add.apply_async((
    2,3), eta=now+tiedelta(second=10))
    expires : 设置超时时间. add.apply_async((
    2,3), expires=60)
    retry : 定时如果任务失败后, 是否重试. add.apply_async((
    2,3), retry=False)
    retry_policy : 重试策略.   max_retries : 最大重试次数, 默认为
    3 次.   interval_start : 重试等待的时间间隔秒数, 默认为 0 , 表示直接重试不等待.   interval_step : 每次重试让重试间隔增加的秒数, 可以是数字或浮点数, 默认为 0.2   interval_max : 重试间隔最大的秒数, 即 通过 interval_step 增大到多少秒之后, 就不在增加了, 可以是数字或者浮点数, 默认为 0.2 .

      s1.py

    from celery import Celery
    import time
    c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")
    
    @c.task
    def myfun1(a,b):
        return f"myfun1{a}{b}"
    
    @c.task
    def myfun2():
        return "myfun2"
    
    @c.task
    def myfun3():
        return "myfun3"

      s2.py

    from s1 import myfun1,myfun2,myfun3,c
    from celery.result import AsyncResult
    from datetime import timedelta
    
    #指定多长时间以后执行
    # s=myfun1.apply_async((10,20),countdown=5)
    
    #第二种方式,使用utc时间
    s=myfun1.apply_async((10,20),eta="utc时间")
    print(s.id)
    # 延时
    # 重试

    三.周期任务

      启动: 在Terminal中

    celery beat -A s2 -l info

      s1.py

    from celery import Celery
    import time
    c=Celery("task",broker="redis://127.0.0.1:6379/2",backend="redis://127.0.0.1:6379/1")
    
    @c.task
    def myfun1(a,b):
        return f"myfun1{a}{b}"
    
    @c.task
    def myfun2():
        return "myfun2"
    
    @c.task
    def myfun3():
        return "myfun3"

      s2.py

    from s1 import c
    from celery.beat import crontab
    c.conf.beat_schedule = {
        "name": {
            "task": "s1.myfun1",
            "schedule": 3,
            "args": (10, 20)
        },
        "crontab": {
            "task": "s1.myfun1",
            "schedule": crontab(minute=44),
            "args": (10, 20)
        }
    }

     配置详解:

     from celery.schedules import crontab
    
     CELERYBEAT_SCHEDULE = {
         # Executes every Monday morning at 7:30 A.M
         'add-every-monday-morning': {
             'task': 'tasks.add',
             'schedule': crontab(hour=7, minute=30, day_of_week=1),
             'args': (16, 16),
        },
     }
  • 相关阅读:
    二、网络基础
    Ado.net
    LINQ
    C#[抽象类,接口]
    自定义类库,并引用
    c#重点[封装,继承,多肽]
    c#重点[集合类型]异常,数组,集合ArrayList,List<>,hashTable,hashtable泛型(Dictionary)
    c#重点[数据类型,构造方法,变量,变量,运算符,装箱,拆箱]
    .net reflector激活
    sqlsever备份,还原和导入导出方法
  • 原文地址:https://www.cnblogs.com/robertx/p/10841129.html
Copyright © 2011-2022 走看看