zoukankan      html  css  js  c++  java
  • python之celery

      中文文档: https://www.celerycn.io/

      Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,可以实时处理任务,也可以定时异步处理任务。
    每次分发任务后得到一个ID,然后根据这个ID查询任务执行情况。

    安装

      

    pip install celery eventlet  # windows系统需要eventlet模块

    基础使用

    下面我们来快速上手celery
    编辑s1.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #!/usr/bin/env python3

    from celery import Celery

    cel = Celery('xxx',
    broker="redis://192.168.1.40",
    backend='redis://192.168.1.40')

    @cel.task
    def f1(x,y):
    return x+y

    然后把s1这个work工作起来,进入命令终端,如果在linux系统,不用添加参数-P eventlet

    1
    E:proxxx_dir> celery worker -A s1 -l info  -P eventlet

    编辑s2.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #!/usr/bin/env python3

    import datetime
    from s1 import f1

    # 立即执行
    result = f1.delay(4,6)
    print(result.id)

    # 定时执行
    ctime = datetime.datetime.now()
    # ctime = datetime.datetime(year=2019,month=2,day=21,hour=14,minute=8)
    utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_time + s10
    result = f1.apply_async(args=[1,3],eta=ctime_x)
    print(result.id)

    编辑s3.py

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #!/usr/bin/env python3

    from celery.result import AsyncResult
    from demo1.s1 import cel

    async = AsyncResult(id="f43bce52-9503-475e-9d19-4a46ed910a8e",app=cel)

    if async.successful():
    ret = async.get() # 获取值
    #async.forget() # 删除值
    print(ret)
    elif async.failed():
    print('执行失败')
    elif async.status == 'PENDING':
    print('任务等待中被执行')
    elif async.status == 'RETRY':
    print('任务异常后正在重试')
    elif async.status == 'STARTED':
    print('任务已经开始被执行')
    else:
    print("任务执行失败")


    async.revoke() # 取消一个任务,当一个任务正在执行,不能取消
    async.revoke(terminate=True) # 终止一个任务,当一个任务正在执行,可以被终止

    二、多目录结构

    经过上面快速上手的学习,了解了celery的基本使用,那么重组一下代,形成项目中多目录结构看看相互之间如何调用?

    创建一个celery_tasks的目录,里面一般保存2类文件,其中一个文件名称必须为celery,另一类就是定义task任务的文件,可以有多个。

    定义celery_tasks/celery.py文件,如果有多个task任务文件,可以用includ列表包含进来

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #!/usr/bin/env python3

    from celery import Celery
    # from celery.schedules import crontab

    cel = Celery('xxxxxx',
    broker='redis://192.168.1.40:6379',
    backend='redis://192.168.1.40:6379',
    include=['celery_tasks.task1',)
    #include=['celery_tasks.task1','celery_tasks.task2'])

    # 时区
    cel.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    cel.conf.enable_utc = False

    在多目录结构中,跑celery work时不用指定到文件,指定目录即可

    1
    E:proxxx_dir> celery worker -A celery_tasks -l info  -P eventlet

    定义celery_tasks/task1.py

    1
    2
    3
    4
    5
    6
    7
    #!/usr/bin/env python3

    from .celery import cel

    @cel.task
    def f1(x,y):
    return x+y

    有了celery.py文件和task任务文件,我们就可以在任意地方调用任务了。

    比如定义test/exec1.py文件来执行任务

    1
    2
    3
    4
    5
    6
    #!/usr/bin/env python3

    from celery_tasks.task1 import f1

    result = f1.delay(4,6)
    print(result.id)

    定义test/exec2.py文件来获取任务执行结果,需要提供任务ID

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    #!/usr/bin/env python3

    from celery_tasks.celery import cel
    from celery.result import AsyncResult

    async = AsyncResult(id="be6bb021-da48-46a9-b1bc-94b987f7c8a7",app=cel)

    if async.successful():
    print(async.get())
    else:
    print("任务执行失败")

    三、Flask中的例用

    有了上面celery的认识,我们来简单写点代码,看一下在Flask框架中celery是如何使用的?

    定义Flask项目启动文件app.py

    写线上代码时是要把任务保存在数据库中的,这里仅作示例就保存在了HISTORY全局变量中了.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    #!/usr/bin/env python3

    from flask import Flask,request,render_template,redirect

    from celery_tasks.task2 import deploy

    app = Flask(__name__)

    HISTORY = []

    @app.route('/index',methods=["GET","POST"])
    def index():
    if request.method == "GET":
    return render_template('index.html',history=HISTORY)


    @app.route('/publish',methods=["GET","POST"])
    def publish():
    if request.method == "GET":
    return render_template('publish.html')
    else:
    version = request.form.get("version")
    hosts = request.form.getlist("hosts")
    print(version,hosts)

    import datetime
    ctime = datetime.datetime.now()
    utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    ctime_10 = utc_time + datetime.timedelta(seconds=10)
    result = deploy.apply_async(args=[version,hosts],eta=ctime_10)
    HISTORY.append({"version":version,"hosts":hosts,"task_id":result.id})
    print(HISTORY)
    return redirect("/index")

    from celery.result import AsyncResult
    from celery_tasks.celery import cel

    @app.route('/check_result',methods=["GET","POST"])
    def check_result():
    task_id = request.args.get("task_id")
    async = AsyncResult(id=task_id,app=cel)
    if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 将结果删除
    return "执行成功"
    elif async.failed():
    return '执行失败'
    elif async.status == 'PENDING':
    return '任务等待中被执行'
    elif async.status == 'RETRY':
    return '任务异常后正在重试'
    elif async.status == 'STARTED':
    return '任务已经开始被执行'
    else:
    return "unkown status"

    @app.route('/cancel', methods=["GET", "POST"])
    def cancel():
    task_id = request.args.get("task_id")
    async =AsyncResult(id=task_id,app=cel)
    async.revoke(terminate=True)
    for i in HISTORY:
    if task_id in i.values():
    HISTORY.remove(i)
    return redirect("/index")

    if __name__ == '__main__':
    app.run()

    定义其中用到的templates/index.html

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    </head>
    <body>
    <h1>发布系统</h1>
    <a href="/publish">添加发布任务</a>
    <table>
    {% for row in history %}
    <tr>
    <td>{{ row.task_id }}</td>
    <td>{{ row.version }}</td>
    {% for host in row.hosts %}
    <td>
    <span>{{ host }}</span>
    </td>
    {% endfor %}
    <td><a href="/check_result?task_id={{ row.task_id }}">查看</a></td>
    <td><a href="/cancel?task_id={{ row.task_id }}">取消</a></td>
    </tr>
    {% endfor %}
    </table>

    </body>
    </html>

    定义其中用到的templates/publish.html

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Title</title>
    </head>
    <body>
    <form action="" method="post">
    <p><input type="text" name="version" placeholder="请输入要发布的版本"></p>
    <p>
    <select name="hosts" multiple="multiple">
    <option value="c1.com">c1.com</option>
    <option value="c2.com">c2.com</option>
    <option value="c3.com">c3.com</option>
    </select>
    </p>
    <input type="submit" value="提交">
    </form>
    </body>
    </html>

    定义其中的celery_tasks.task2.py文件,这里的deploy是真正定义任务的地方.

    1
    2
    3
    4
    5
    6
    7
    8
    #!/usr/bin/env python3

    from .celery import cel

    @cel.task
    def deploy(version,hosts):
    print(version, hosts) # 定义想要执行的任务代码
    return 'deploy ok'

    同样别望了先把work跑起来,再启动Flask

    1
    E:proxxx_dir> celery worker -A celery_tasks -l info  -P eventlet
  • 相关阅读:
    最长回文子串 V2(Manacher算法)
    用例建模Use Case Modeling
    分析一套源代码的代码规范和风格并讨论如何改进优化代码
    结合工程实践选题调研分析同类软件产品
    如何提高程序员的键盘使用效率
    antd移动端onClick事件点击无效
    webpack打包问题
    centos下部署项目问题
    javascript return 跟 break区别
    VUE清除组件内部定时器
  • 原文地址:https://www.cnblogs.com/xingxia/p/python_celery.html
Copyright © 2011-2022 走看看