zoukankan      html  css  js  c++  java
  • Celery的实践指南

    http://www.cnblogs.com/ToDoToTry/p/5453149.html

    Celery的实践指南

     
    Celery的实践指南
    celery原理:
    celery实际上是实现了一个典型的生产者-消费者模型的消息处理/任务调度统,消费者(worker)和生产者(client)都可以有任意个,他们通过消息系统(broker)来通信。
    典型的场景为:
    1. 客户端启动一个进程(生产者),当用户的某些操作耗时较长或者比较频繁时,考虑接入本消息系统,发送一个task任务给broker。
    2. 后台启动一个worker进程(消费者),当发现broker中保存有某个任务到了该执行的时间,他就会拿过来,根据task类型和参数执行。
     
    实践中的典型场景:
    1. 简单的定时任务:
      1. 替换crontab的celery写法:

        1. from celery import Celery
          from celery.schedules import crontab

          app = Celery("tasks", backend="redis://localhost", broker="redis://localhost")

          app.conf.update(CELERYBEAT_SCHEDULE = {
              "add": {
                  "task": "celery_demo.add",
                  "schedule": crontab(minute="*"),
                  "args": (16, 16)
              },
          })

          @app.task
          def add(x, y):
              return x + y

      2. 运行celery的worker,让他作为consumer运行,自动从broker上获得任务并执行。
        1. `celery -A celery_demo worker`
      3. 运行celery的client,让其根据schedule,自动生产出task msg,并发布到broker上。
        1. `celery -A celery_demo beat`
      4. 安装并运行flower,方便监控task的运行状态
        1. `celery flower -A celery_demo`
        2. 或者设置登录密码 `
          celery flower -A celery_demo --basic_auth=user1:password1,user2:password2
    2. 多同步任务-链式任务-
    3. 失败自动重试的task
      1. 失败重试方法: 将task代码函数参数增加self,同时绑定bind。
      2. demo代码:
        1. @app.task(bind=True, default_retry_delay=300, max_retries=5)
          def my_task_A(self):
              try:
                  print("doing stuff here...")
              except SomeNetworkException as e:
                  print("maybe do some clenup here....")
                  self.retry(e)
      3. 自动重试后,是否将任务重新入queue后排队,还是等待指定的时间?可以通过self.retry()参数来指定。
    4. 派发到不同Queue队列的task
      1. 一个task自动映射到多个queue中的方法, 通过配置task和queue的routing_key命名模式。
        1. 比如:把queue的exchange和routing_key配置成通用模式:
        2. 再定义task的routing_key的名称:
      2. 可用的不同exchange策略:
        1. direct:直接根据定义routing_key
        2. topic:exchange会根据通配符来将一个消息推送到多个queue。
        3. fanout:将消息拆分,分别推送到不同queue,通常用于超大任务,耗时任务。
      3. 参考:http://celery.readthedocs.org/en/latest/userguide/routing.html#routers
    5. 高级配置
      1. result是否保存
      2. 失败邮件通知:
      3. 关闭rate limit:
    6. auto_reload方法(*nix系统):
      1. celery通过监控源代码目录的改动,自动地进行reload
      2. 使用方法:1.依赖inotify(Linux) 2. kqueue(OS X / BSD)
      3. 安装依赖:
        $ pip install pyinotify
      4. (可选) 指定fsNotify的依赖:
        $ env CELERYD_FSNOTIFY=stat celery worker -l info --autoreload
      5. 启动: celery -A appname worker --autoreload
    7. auto-scale方法:
      1. 启用auto-scale
      2. 临时增加worker进程数量(增加consumer):
        $ celery -A proj control add_consumer foo -d worker1.local
      3. 临时减少worker进程数量(减少consumer):
    8. 将scheduled task的配置从app.conf变成DB的方法:
      1. 需要在启动时指定custom schedule 类名,比如默认的是: celery.beat.PersistentScheduler 。
        1.  celery -A proj beat -S djcelery.schedulers.DatabaseScheduler
    9. 启动停止worker的方法:
      1. 启动 as daemon : http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing
        1. root用户可以使用celeryd
        2. 非特权用户:celery multi start worker1 -A appName  —autoreload  --pidfile="HOME/run/celery/HOME/run/celery/HOME/log/celery/%n.log"
        3. 或者 celery worker —detach
      2. 停止
      3. ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
    10. 与Flask集成的方法
      1. 集成后flask将充当producer来创建并发送task给broker,在celery启动的独立worker进程将从broker中获得task并执行,同时将结果返回。
      2. flask中异步地获得task结果的方法:add.delay(x,y),有时需要对参数进行命名后传递 或者 add.apply_async(args=(x,y), countdown=30)
      3. flask获得
    11. 与flask集成后的启动问题
      1. 由于celery的默认routing_key是根据生产者在代码中的import级别来设定的,所以worker端在启动时应该注意其启动目录应该在项目顶级目录上,否者会出现KeyError。
    12. 性能提升: eventlet 和 greenlet
     
     
     
     
  • 相关阅读:
    AI 数值计算
    AI 主成分分析(PCA)
    AI 线性代数
    AI 奇异值分解(SVD)
    AI 协同过滤
    AI 卷积神经网络
    AI 随机梯度下降(SGD)
    Ecshop里添加多个h1标题
    Ecshop之ajax修改表里的状态(函数化处理)
    url地址形式的传参格式拼接
  • 原文地址:https://www.cnblogs.com/xiaojikuaipao/p/6194749.html
Copyright © 2011-2022 走看看