zoukankan      html  css  js  c++  java
  • Celery和Rabbitmq自学

    异步消息队列,也能用于定时和周期性任务。每次修改的task代码还要重启worker,这个有点麻烦

    所有带task()装饰器的可调用对象(usertask)都是celery.app.task.Task类的子类,也就是说task()装饰器会将usertask标识符变成Task子类的引用。

    另外,celery允许用自定义Task类,不过该类要继承于celery.app.task.Task,Task类在task状态转换动作时提供了接口,如任务执行失败时调用接口on_failure,

    这样就非常方便我们在自定义Task类中重定义。见http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes

    task()装饰器可以接收很多参数,比如序列化类、是否保存task结果、所使用的back_end等等,见

    http://docs.celeryproject.org/en/latest/userguide/tasks.html#list-of-options。比如,我们可以设置只在出错的情况下保存运行结果。即

    ignore_result=False且store_errors_even_if_ignored=True,不过,ignore_result可以在全局配置文件中设置CELERY_IGNORE_RESULT

    当发异步消息时调用usertask.delay()或usertask.apply_async(),它其实是将usertask的信息,如名称,入参,id等序列化后保存在broker中。

    如果不限制task的处理速度,那应该设置CELERY_DISABLE_RATE_LIMITS = True,这算是celery的优化部分

    task不要嵌套,如果希望多个task顺序执行(同步),那可以用回调函数,在celery中是用chain()方法实现,见

    http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks

    http://docs.celeryproject.org/en/latest/userguide/canvas.html

    celery在两个地方有retry,一个是在task的代码执行过程中出现异常时可以retry,不过这个需要用户在task代码中自己写,如果想

    retry那就要我们自己捕获异常,并抛出Retry exception,参见http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying

    另外一个地方是在调用delay()/apply_async()时,这个retry可能是指向rabbitmq发送消息时如果失败,可以重试如果想链式调用task,那要设置link参数,见

    http://docs.celeryproject.org/en/latest/userguide/canvas.html#callbacks

    http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

    开发环境中,work的启动、结束、重启可以用celery multi xxx命令

    生产环境中,用http://docs.celeryproject.org/en/latest/tutorials/daemonizing.html#daemonizing

    celery中的一个worker其实是代表一个进程池,一个进程池是由一个父进程和多个子进程组成, 貌似父进程不干事,只用于分配task,子进程数默认是CPU核数

    一台主机上可以启多个worker,但这种方法貌似和一个worker中启多个子进程区别不大啊,两种方式都是多进程。

    在启动worker时,可以设置worker的很多参数,如是否允许自动伸缩pool的容量,最大和最小容量,见

    http://docs.celeryproject.org/en/latest/userguide/workers.html#autoscaling

    celery有很多设置,其中在配置文件celeryconfig中的设置是全局的,另外,我们还可以单独设置task(可以在task定义的地方,也可以在delay(), apply_async()中设置),

    worker启动时可以设置参数

    celery的broker中的task队列可以有多个,用不同的名字命名,还可以有不同的优先级。我们启动worker时可以设置该worker只处理指定队列的task

    celery中的log使用的是python的log模块,是线程安全的,而不是进程安全的。我们可以给一个worker中的每个process定义其logfile,见

    http://docs.celeryproject.org/en/latest/userguide/workers.html#variables-in-file-paths

    一个node是指一个worker

    celery的每个worker的每个进程可以一次从broker中取出多个task

    查看所有node中的活动的task: celery -A proj inspect active

    查看所有node中注册的所有task:celery -A proj inspect registered

    查看所有node的信息:celery -A proj inspect stats

    调用异步消息接口delay() ,apply_async()时,其实只是应用程序与broker通信而已,发消息给broker,消息中包含调用的task名称,参数等,broker收到后保存。这个过程中不会与celery worker打交道

    因此,这个时候即使没有启动celery worker也没有任何关系。当celery worker启动后,会主动连接broker,并从broker那里取数据来consume。同理,我们可以在任何平滑停止celery worker,而不用管

    broker中是否还有没consume的消息,当celery worker重启后,它会继续处理broker中的消息。这里的启动顺序貌似只能是先启动broker,再启动celery,因为启动celery时会与broker建立连接,broker

    是被动连接的。

    broker自己应该是有办法保证一个消息只能被一个worker取走,而不会同时被两个worker取走。

    有一个问题:rabbitmq有内存占用限制吗?如果有,那当达到这个限制时,新来的消息怎么处理?像reddis那样把旧的消息写到磁盘?还是直接抛弃旧的消息?或不再接收新消息?

    Rabbitmq有监控,默认的,Rabbitmq_management是关闭的,需要打开,执行sudo rabbitmq-plugins enable rabbitmq_management,打开这个plugin,

    然后重启Rabbitmq,在浏览器输入http://localhost:15672,用户名和密码都是guest,就可以看到Rabbitmq的监控信息了。

    Rabbitmq相当于一个邮箱,用于接收和发送消息,而且它里面有很多队列,发送消息时可以指定用哪个队列,一个消息中包含一个task

    (1) 发送者将消息发给rabbitmq的指定队列,消息中包含消息处理函数

    (2) rabbitmq接收并保存消息(无数量限制)

    (3) 接收者联系rabbitmq,从队列中取走消息(可以指定接收者只消费某个指定队列的消息)

    (4) 接收者取回消息后,调用消息中的处理函数处理消息

    其实这个过程中我们要考虑消息执行的可靠性。

    第一点:若消息被worker接收后,处理过程中worker死掉了怎么办?Rabbitmq考虑到了这点,在默认情况下(是一个配置项,我们可以修改),每个消息其实是被worker拷贝了一份取走的,

    当然被拷贝的消息会打上标记,当worker处理完消息后会给Rabbitmq一个确认(acknowledgment),Rabbitmq收到确认后才删除消息,这个过程没有限定超时时间,只有当worker与

    Rabbitmq之间的TCP连接断开后(Rabbitmq应该是会检测worker的心跳),Rabbitmq才会将没有确认的消息重新加入到队列。我们可以使用命令:rabbitmqctl list_queues name messages_ready

    messages_unacknowledged来查看有多少消息没有收到确认。它是一个默认项,我们关闭这个可靠性设置,这样,消息处理完不用再给Rabbitmq发acknowledgment了。在celery配置文件中

    对应于CELERY_ACKS_LATE项

    第二点:若Rabbitmq挂掉,消息也会丢失。因为要做消息的持久化,Rabbitmq这样做了,不过它的持久化并不绝对可靠,因为从接收到消息到持久化之间有时间间隔,如果想绝对可靠,见

    http://www.rabbitmq.com/tutorials/tutorial-two-python.html中的Note on message persistence部分。

    还有些可靠性是worker自己来保证的,如task执行过程中抛出异常,这种情况是否要重新执行该task,celery的处理如下:

    在celery中,task在被worker执行过程中如果抛出异常(Exception),celery会捕获并给Rabbitmq发acknowledgment,如果我们想让该task重新执行,也可以,那就要自己手动捕获异常,

    并抛出Retry exception,这样worker会再次执行该task,这个过程中不会与Rabbitmq交互。

    1、celery中的基本概念

     

    2、Rabbitmq中的基本概念

    publish:即生产者将消息发送给Rabbitmq的过程,更准确的说,是发送给Rabbitmq exchange的过程,在Rabbitmq的控制台网页中的overview子页的Message rates中有publish一项,它指

    的是指消息进入Rabbitmq的速率。

    3、celery中的配置项

    CELERYD_PREFETCH_MULTIPLIER

    乘子,worker的进程数*CELERYD_PREFETCH_MULTIPLIER=worker每次从Rabbitmq一次性取走多的消息数

    4、Rabbitmq配置项和查询命令

    首先几乎所有的控制命令都通过rabbitmqctl执行,很多东西在网页管理页可以看到

    rabbitmqctl list_queues

    查看queue相关的信息,后面可加参数name(名称),messages(消息数量), consumers(消息者数量,该值与worker数不相等,一般是worker数的三倍,貌似每个worker顺带两个额外的consumer,这两

    个consumer我们不用关心)

    rabbitmqctl list_exchanges

    查看所有exchange,如下,第一列是名称,第二列是exchang type。Rabbitmq共有四种类型:direct,fanout,headers,topic。

    direct是指将消息直接发到指定队列,fanout是指广播,发给绑定到该exchange的所有队列。我们一般是用direct。

    在celery中,当broker是Rabbitmq时,会调用Rabbitmq创建默认的exchange和默认的queue,它们的名称都是celery,并且该queue是没有绑定任何exchange的,给Rabbitmq发消息时会指定队列,exchange

    将消息发到该指定的队列。貌似基本上不会用到绑定功能

    C:Workherasrc>rabbitmqctl list_exchanges
    Listing exchanges ...
            direct
    amq.direct      direct
    amq.fanout      fanout
    amq.headers     headers
    amq.match       headers
    amq.rabbitmq.log        topic
    amq.rabbitmq.trace      topic
    amq.topic       topic
    celery  direct
    celery.pidbox   fanout
    celeryev        topic
    reply.celery.pidbox     direct
    ...done.

    C:Workherasrc>

    service rabbitmq-server start

    启动rabbitmq

  • 相关阅读:
    做程序员,我骄傲了吗?
    乐字节Java面向对象三大特性以及Java多态
    Java为什么有前途?什么人适合学Java?
    Java新手从入门到精通的学习建议
    Java变量与数据类型之二:Java常量与变量
    模块化、结构化的代码,程序员正讲述着人生
    乐字节Java变量与数据类型之一:Java编程规范,关键字与标识符
    乐字节Java学习课程-path环境变量的作用与配置
    我英语不好,能学会编程吗?
    为什么欧拉图要用栈存然后逆着输出
  • 原文地址:https://www.cnblogs.com/ajianbeyourself/p/3888758.html
Copyright © 2011-2022 走看看