zoukankan      html  css  js  c++  java
  • Celery 之异步任务、定时任务、周期任务

    什么是Celery?
    Celery 是芹菜
    Celery 是基于Python实现的模块, 用于执行异步定时周期任务的
    其结构的组成是由
        1.用户任务 app
        2.管道 broker 用于存储任务 官方推荐 redis rabbitMQ  / backend 用于存储任务执行结果的
        3.员工 worker

    一 异步任务

     1 from celery import Celery
     2 import time
     3 
     4 my_task = Celery('tasks', broker='redis://127.0.0.1:6379', backend='redis://127.0.0.1:6379')
     5 
     6 
     7 @my_task.task
     8 def func(x, y):
     9     time.sleep(20)
    10     return x + y
    s1
    1 from s1 import func
    2 
    3 ret = func.delay(2, 4)
    4 
    5 sid = ret.id
    6 print(ret.id)
    s2
     1 from celery.result import AsyncResult
     2 from asyn.s1 import my_task
     3 
     4 async_task = AsyncResult(id="8789dd69-437a-47c2-a3a5-1cd5c3ba9c73", app=my_task)
     5 
     6 if async_task.successful():
     7     result = async_task.get()
     8     print(result)
     9 
    10 else:
    11     print('任务还未完成')
    s3

    ps: eventlet 是一个python的三方库 需要使用 pip安装 pip install eventlet

     celery worker -A s1 -l INFO -P eventlet  # 创建worker 执行代码

    二、周期任务

    创建两个任务

    task_one:

    1 from s6 import celery_task
    2 
    3 
    4 @celery_task.task
    5 def my_func1_task_one(a,b):
    6     return f"my_func1_task_one return{a}{b}"
    View Code

    task_two

    1 from s6 import celery_task
    2 
    3 
    4 @celery_task.task
    5 def my_func1_task_two(a, b):
    6     return f"my_func1_task_one return{a}{b}"
    View Code

    设置执行周期时间

     1 # 周期任务
     2 
     3 from celery import Celery
     4 from celery.schedules import crontab
     5 
     6 celery_task = Celery("task",
     7                      broker="redis://127.0.0.1:6379",
     8                      backend="redis://127.0.0.1:6379",
     9                      include=["task_one","task_two"])
    10 
    11 #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
    12 celery_task.conf.beat_schedule ={
    13     "each10s_task": {
    14         "task":"task_one.my_func1_task_one",
    15         "schedule": 10,  # 每10秒钟执行一次
    16         "args": (10, 20)
    17     },
    18     "each5s_task": {
    19         "task": "task_two.my_func1_task_two",
    20         "schedule": 5,  # 每5秒
    21         "args": (50, 60)
    22     },
    23     # "each24hours_task": {
    24     #     "task": "Celery_task.task_one.one",
    25     #     "schedule": crontab(hour=24), # 每24小时执行一次
    26     #     "args": (10, 10)
    27     # }
    28 
    29 }
    30 
    31 #以上配置完成之后,还有一点非常重要
    32 # 不能直接创建Worker了,因为我们要执行周期任务,所以首先要先有一个任务的生产方
    33 # celery beat -A Celery_task
    34 # celery worker -A Celery_task -l INFO -P eventlet
    View Code

    执行指令:

    1、创建Worker的方式并没有发行变化,但是这里要注意的是,每间隔一定时间后需要生产出来任务给Worker去执行,这里需要一个生产者beat

    celery beat -A Celery_task  #创建生产者 beat 你的 schedule 写在哪里,就要从哪里启动

    2、celery worker -A Celery_task -l INFO -P eventlet

    友情链接

  • 相关阅读:
    数学+高精度 ZOJ 2313 Chinese Girls' Amusement
    最短路(Bellman_Ford) POJ 1860 Currency Exchange
    贪心 Gym 100502E Opening Ceremony
    概率 Gym 100502D Dice Game
    判断 Gym 100502K Train Passengers
    BFS POJ 3278 Catch That Cow
    DFS POJ 2362 Square
    DFS ZOJ 1002/HDOJ 1045 Fire Net
    组合数学(全排列)+DFS CSU 1563 Lexicography
    stack UVA 442 Matrix Chain Multiplication
  • 原文地址:https://www.cnblogs.com/liaopeng123/p/10395855.html
Copyright © 2011-2022 走看看