zoukankan      html  css  js  c++  java
  • celery异步任务 定时任务

    以前项目中用到过 celery ,但是没怎么记笔记,现在在记一下,方便以后用。

     
    Celery.png

    问:Celery 是什么?

    答:Celery 是一个由 Python 编写的简单、灵活、可靠的用来处理大量信息的分布式系统,它同时提供操作和维护分布式系统所需的工具。
    Celery 专注于实时任务处理,支持任务调度。(来源于网络)

    问:适用场景在哪里?

    答:如图示(来自:http://blog.csdn.net/xsj_blog/article/details/70181984

     
    image.png

    问:生产者和消费者模式定义是什么?

    答:
    (1)生产者->负责产生数据;
    (2)消费者->负责数据处理;
    (3)缓冲区->解耦生产者和消费者,减少依赖,主要是通过消息队列来进行两点之间的通讯处理。

    图示:


     
    image.png

    问:什么是任务队列?

    答:任务队列是一种在线程或机器间分发任务的机制。

    问:什么是消息队列?

    答:消息队列的输入是工作的一个单元,称为任务,独立的职程(Worker)进程持续监视队列中是否有需要处理的新任务。

    问:职程有什么作用?

    答:Celery 用消息通信,通常使用中间人(Broker)在客户端和职程间斡旋。这个过程从客户端向队列添加消息开始,之后中间人把消息派送给职程,职程对消息进行处理。如下图所示:


     
    image.png

    问:Celery的架构三部分是哪几个部分?

    答:Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    (1)消息中间件
    PS: Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成,包括,RabbitMQ,Redis,MongoDB等。

    (2)任务执行单元
    PS: Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。

    (3)任务结果存储
    PS: Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括Redis,MongoDB,Django ORM,AMQP等。

    (任务调度)Celery Beat:任务调度
    Celery Beat:任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。

     
    来自网络

    消息分发与任务调度的实现机制(来自:http://blog.csdn.net/xsj_blog/article/details/70181984

     
    image.png

    1:—>producer发出调用请求(message包含所调用任务的相关信息)
    2:—>celery服务启动时,会产生一个或多个交换机(exchanges),对应的交换机 接收请求message
    3:—>交换机根据message内容,将message分发到一个或多个符合条件的队列(queue)
    4:—>每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。

    注释:Exchange和Queue都是Rabbitmq中的概念

    Exchange:交换机,决定了消息路由规则;
    
    Queue:消息队列;
    
    Channel:进行消息读写的通道;
    
    Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪一个[消息队列];
    

    调图流程图示:(来自https://www.cnblogs.com/forward-wang/p/5970806.html

     
    image.png

    实践步骤:

    相关依赖:
     
    image.png
    第1步:首先搭建bottle客户端端,进行任务委派:
    #!/usr/bin/evn python
    # coding=utf-8
    """
    Author = zyx
    @Create_Time: 2018/1/30 15:58
    @version: v1.0.0
    @File: main.py
    @文件功能描述:
    """
    
    from bottle import route, run
    
    @route('/')
    def index():
        return '访问了首页!'
    
    run(host='127.0.0.1', port=8080, debug=True, reloader=True)
    main.py

    启动wen服务应用访问:


     
    image.png
    第2步:编写对应Celery任务模块celery_test
     
    image.png
    第3步:编写对应Celery任务模块启动配置文件
    # coding:utf-8
    from datetime import timedelta
    from kombu import Exchange, Queue
    
    # 配置消息中间件Broker
    BROKER_URL = 'redis://127.0.0.1:6379/0'
    
    # 配置结果存贮Backend
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
    
    # 指定时区,默认是 UTC
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    
    #限制所有的任务的刷新频率
    CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}
    
    # # 不需要返回任务状态,即设置以下参数为True
    # 如果不需要某个任务的结果,应该确保Celery不去获取这些结果。这是通过装饰器@task(ignore_result=True)来做的。如果所有的任务结果都忽略了,就不必定义结果后台。这可以让性能大幅提高。
    CELERY_IGNORE_RESULT = True
    
    # 任务序列化和反序列化使用msgpack方案
    # CELERY_TASK_SERIALIZER = 'json'
    
    # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
    # CELERY_RESULT_SERIALIZER = 'json'
    
    # 指定任务模块
    CELERY_IMPORTS = (
        'celery_test.tasks',
    )
    
    
    # celery worker的并发数 也是命令行-c指定的数目,事实上实践发现并不是worker也多越好,保证任务不堆积,加上一定新增任务的预留就可以
    CELERYD_CONCURRENCY = 10
    CELERYD_FORCE_EXECV = True  # 非常重要,有些情况下可以防止死锁
    
    # celery worker 每次去redis取任务的数量,我这里预取了4个慢慢执行,因为任务有长有短没有预取太多
    CELERYD_PREFETCH_MULTIPLIER = 4
    
    #每个worker执行了多少次任务后就会死掉,建议数量大一些
    CELERYD_MAX_TASKS_PER_CHILD = 200
    
    # CELERY_ACCEPT_CONTENT = ['json']  # 指定接受的内容类型
    
    #celery任务执行结果的超时时间
    CELERY_TASK_RESULT_EXPIRES = 1200
    #单个任务的运行时间限制,否则会被杀死
    CELERYD_TASK_TIME_LIMIT = 60
    
    # # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面
    # CELERY_DEFAULT_QUEUE = "default"
    #
    # CELERY_QUEUES = (
    #     Queue('default', Exchange('default'), routing_key='default'),# 这是上面指定的默认队列
    #     Queue('for_add', Exchange('for_task_add'), routing_key='for_task_add'), # 这是一个for_add队列 凡是for_task_add开头的routing key都会被放到这个队列
    #     Queue('for_send_email', Exchange('for_task_email'), routing_key='for_task_email'),
    # # 这是一个or_send_email'队列 凡是for_task_email开头的routing key都会被放到这个队列
    # )
    #
    # CELERY_ROUTES = {
    #     'celery_test.tasks.add': {'queue': 'for_add', 'routing_key': 'for_task_add'},
    #     'celery_test.tasks.easeye_send_mails': {'queue': 'for_send_email', 'routing_key': 'for_task_email'},
    # }
    #
    
    
    #定时任务
    CELERYBEAT_SCHEDULE = {
        'easeye_send_mail': {
            'task': 'celery_test.tasks.easeye_send_mails',
            'schedule': timedelta(seconds=30),
        },
        'add': {
            'task': 'celery_test.tasks.add',
            'schedule': timedelta(seconds=10),
            'args': (16, 16)
    
        }
    }
    setting.py
    第4步:编写对应Celery实例
    from  celery import Celery
    app=Celery('celery_test',include=['celery_test.tasks'])
    app.config_from_object('celery_test.setting')
    
    if __name__=='__main__':
        app.start()
    server.py
    第5步:编写对应任务
    # coding:utf-8
    from celery_test.server import app
    
    @app.task(bind=True)
    def add(self,x, y):         #自己放项目里 一定不要忘了self
        return x + y
    
    
    @app.task(bind=True)
    def send_mail(self,x, y):
        return x - y
    tasks.py
    第6步:修改main.py进行任务调用
    from bottle import route, run, redirect
    
    from celery_test import tasks
    
    
    # @route('/add')
    # def index():
    #     tasks.add.daley(888, 45)
    #     return '访问了add!'
    
    
    @route('/send_mail')
    def index():
        task = tasks.send_mail.delay(888, 45)
        print('访问了send_mail!')
        return redirect('/tasks_status/' + task.id)  # 重定向到首页(可以 )
    
    
    @route('/tasks_status/<task_id>')
    def index(task_id):
        # 获取异步任务结果
        task = tasks.send_mail.AsyncResult(task_id)
        # 等待处理
        if task.state == 'PENDING':
            response = {'state': task.state, 'current': 0, 'total': 1}
            print('PENDING:', response)
        elif task.state != 'FAILURE':
            response = {'state': task.state, 'current': task.info.get('current', 0), 'total': task.info.get('total', 1)}
            # 处理完成
            if 'result' in task.info:
                response['result'] = task.info['result']
            print('处理完成:', response)
        else:
            # 后台任务出错
            response = {'state': task.state, 'current': 1, 'total': 1}
            print('后台任务出错:', response)
    
    
    run(host='127.0.0.1', port=8080, debug=True, reloader=True)
    main.py
    第7步:启动指定的队列
    celery -A celery_test.server worker --loglevel=debug --pool=solo
     
    image.png

    启动成功如图示:


     
    image.png
    第8步:启动web服务调用对应的URL请求异步处理异步任务

    调用:

    http://127.0.0.1:8080/send_mail
     
    image.png

    即时查看任务处理情况:

    http://127.0.0.1:8080/tasks_status/0079d834-d918-4ad7-88dd-f23c5eeb09dc

    查看对应的celery的运行 情况:

     
    image.png

    问:监控Celery任务执行情况?

    答:Flower是基于web的监控和管理Celery的工具.
    相关文档:
    http://flower-docs-cn.readthedocs.io/zh/latest/
    安装pip install flower
    启动flower(flower默认的端口是5555.)
    celery flower --port=5555 --broker=redis://localhost:6379/0
    celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
    启动任务查看

    celery flower --port=5555 --broker=redis://localhost:6379/0
    image.png

    访问:127.0.0.1:5555


     
    image.png

    进行任务执行:http://127.0.0.1:8080/send_mail
    查看任务执行结果

     
    image.png

    PS其他命令

    ============================================================================
    前台启动
    
    启动指定的队列
     celery -A celery_test.server worker -l info -Q for_send_email
    
     celery -A celery_test.server worker -l info -Q for_add
    
    启动定时相关的任务队列
     celery -A celery_test.server beat
    
     celery -A celery_test.server worker -l info -Q for_send_email
    
     celery -A celery_test.server worker -l info -Q for_add
    
    
    
    
    ============================================================================
    后台启动
    celery multi start w1 -A proj -l info
    celery  multi restart w1 -A proj -l info
    
    # 异步关闭 立即返回
    celery multi stop w1 -A proj -l info
    # 等待关闭操作完成
    celery multi stopwait w1 -A proj -l info
    
    调用任务:
    add.apply_async((2, 2), queue='lopri', countdown=10)
    # 指定要发送到哪个队列 运行时间延迟countdown


  • 相关阅读:
    led呼吸灯
    定时器中断
    npgsql
    中断
    PAT (Advanced Level) 1128~1131:1128N皇后 1129 模拟推荐系统(set<Node>优化) 1130 中缀表达式
    PAT (Advanced Level) 1132~1135:1132 模拟 1133模拟(易超时!) 1134图 1135红黑树
    PAT (Advanced Level) 1136~1139:1136模拟 1137模拟 1138 前序中序求后序 1139模拟
    PAT (Advanced Level) 1140~1143:1140模拟 1141模拟 1142暴力 1143 BST+LCA
    PAT (Advanced Level) 1144~1147:1145Hash二次探查 1146拓扑排序 1147堆
    python实现http接口测试
  • 原文地址:https://www.cnblogs.com/zxs117/p/12066044.html
Copyright © 2011-2022 走看看