zoukankan      html  css  js  c++  java
  • Celery分布式任务队列

    Celery任务处理:
    task 任务 -- 一个Python函数, 内书写异步代码, 使之可在项目中随用随调
    broker 中间人 -- 项目中调用异步代码后, 作为任务发布给中间人, 中间人进行分配调度,将任务分配入所属队列
    queue 队列 -- 任务放进队列中,先进先出, 交给处理者执行.
    worker 处理者 -- 每个处理者为一个独立进程, 处理者和队列关系可以根据项目情况为一对一,一对多,多对一关系.

    使用流程:


    0.选择并配置broker环境.


    1.创建异步列表
    创建用以执行异步代码的py文件

    project/celery_task/tasks.py


    导入celery后实例化对象并写入调用路径 以及 broker参数

    from celery import Celery
    
    # 实例化对象并传参. 此处使用redis作为broker..应当注意broker参数的书写规范
    app = Celery('celery_task.tasks', broker='redis://127.0.0.1:6379/8')



    2.更改代码

    分离出项目中需要异步执行的代码
    在tasks中def每段需要异步执行的代码

    # 使用实例化对象的task方法装饰需要异步执行的代码
    @app.task
    def PrintSum(a, b):
        time.sleep(5)
        a += b
        print('a和b的和为:' + str(a))
        time.sleep(5)

    ...


    改写原代码
    导入tasks里对应的模块,调用delay方法将任务并传参, 使任务加入queue队列

    from celery_tasks.tasks import PrintSum
    ...
    a = 1
    b = 2
    PrintSum.delay(a, b)



    3.环境配置
    安装配置worker的必要运行环境,Django,celery,以及所分配任务 必要的环境依赖.

    复制代码到worker环境内(与该任务相关文件)
    worker环境内的tasks内需增加Django初始化配置

    import os
    import django
    
    # 系统环境变量-设置默认(Django的设置模块, 以及自定设置)  原wsgi接口协议文件中书写. !参数应用双引号.
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dailyfresh.settings")
    
    # 手动进行django初始化
    django.setup()


    4.运行
    启动worker

    # 使用celery模块调用APP, 路径为...作为处理者

    celery -A celery_tasks.tasks worker


    # 使用celery模块调用APP, 路径为...作为处理者(设置消息级别为:通知级)

    celery -A celery_tasks.tasks worker -l info

    ##

    后期对任务内容修改, 也同样需要修改worker端代码

  • 相关阅读:
    Microsoft Office MIME Types
    启动mongodb
    学习Hbase API的一个视频
    报错:Hive Runtime Error while processing row
    821. Shortest Distance to a Character
    1171. Remove Zero Sum Consecutive Nodes from Linked List
    190. Reverse Bits
    Rust--如何实现内存安全的?
    Rust -- as_ref与borrow的区别
    653. Two Sum IV
  • 原文地址:https://www.cnblogs.com/jrri/p/11531172.html
Copyright © 2011-2022 走看看