zoukankan      html  css  js  c++  java
  • celery使用的小记录

    一篇还不错的入门说明: http://www.bjhee.com/celery.html,
    官方文档: http://docs.jinkan.org/docs/celery/getting-started/brokers/redis.html

    这里不多说重复内容,只讲一下结构上的理解:

    celery不仅是队列,它是一个并行分布式框架
    你不需要考虑怎么分布式,怎么写daemon进程,只需要关注你的单个任务要怎么处理。当你把单个任务处理的代码放入celery框架之后,它就自带了并行分布式、后台持续运行的特点。

    celery使用redis,rabbitMQ等作为broker,它的两个角色:任务发送方(生产者)把任务放入队列,worker(消费者)从队列中取出任务进行处理。这两个角色并不要求在同一台机器上,且可以有多台机器上的多个生产者,也可以有多台机器上的多个worker。

    celery的生产者和消费者怎么知道该使用哪个函数执行任务呢?这其实是使用了反射机制,以官方文档getting started例子来说,即:
    当生产者使用

    from my_tasks import add
    ret = add.delay(3, 4)
    

    把这个任务放入队列的时候,同时放入了my_tasks.add这个函数名,消费者取出任务的同时会取出这个my_tasks.add并调用自己的已经导入的这个函数进行处理。
    所以在生产者、消费者的导入上需要注意,否则消费者会报找不到函数的。

    (注意:在文件夹folder中使用from .my_tasks import add导入的add将会被当成folder.my_tasks.add!!)

    Python系列之反射、面向对象 https://www.cnblogs.com/yyyg/p/5554111.html


    生产者和消费者都需要导入一个定义了任务的同名文件(如:my_tasks.py),注意是同名文件,而不必是同一文件。(当生产者和消费者不在同一台机器上,自然无法是同一文件...)
    消费者所导入的my_tasks文件中,当然需要包含详细的处理过程,但是生产者所导入的my_tasks文件,作用却和C/C++中的头文件差不多。


    重新构造AsyncResult对象
    上面调用ret = add.delay(3, 4)时,返回的 ret 是一个AsyncResult对象,可以使用 ret.ready() 查看任务是否执行完毕,使用ret.get(timeout=xx)获取执行结果。(需要配置backend)
    当任务很多的时候,我是不是可以把这些任务的id写入文本文件或者数据库,等过一段时间再来查询某个任务的状态呢?
    当然可以,ret这个AsyncResult对象可以使用ret.id这个字符串重新构造:

    ret
    <AsyncResult: b733c852-4161-449e-930a-a395702b1203>
    ret.id
    'b733c852-4161-449e-930a-a395702b1203'
    
    # 构造:
    import tasks  # 为了导入backend配置,不必重复导入
    from celery.result import AsyncResult
    r=AsyncResult('b733c852-4161-449e-930a-a395702b1203')
    r
    <AsyncResult: b733c852-4161-449e-930a-a395702b1203>
    r.ready()
    True  # 任务已执行完毕
    r.get(timeout=3)
    。。。。
    


    Python3.7安装celery,运行时可能报错:

    File "/Users/li/.venv/venv-myprojet/lib/python3.7/site-packages/celery/backends/redis.py", line 22
        from . import async, base
                          ^
    SyntaxError: invalid syntax
    

    原因是Python3.7的async重命名了

    [Rename `async` to `asynchronous` (async is a reserved keyword in Python 3.7) #4879](https://github.com/celery/celery/pull/4879)
    

    开发人员已经处理了这个issue,合并了master,快速的解决方案是通过github安装celery,命令如下:

    pip install --upgrade https://github.com/celery/celery/tarball/master
    
  • 相关阅读:
    (一)Kafka0.8.2官方文档中文版系列入门指南
    Hbase TTL(Time To Live)详解
    java源码学习详解Object类
    设计模式详细解读简单工厂方法模式
    (二)Kafka0.8.2官方文档中文版系列API
    Scala对象相等性判断
    scala中跳出循环的3种方法
    wpf 中借助 Grid 实现随着 Form 大小变化而按比例自动改变宽度或高度。
    static and cache
    约定编程之 Dictionary 的 String 类型的 Key
  • 原文地址:https://www.cnblogs.com/dylanchu/p/10230555.html
Copyright © 2011-2022 走看看