zoukankan      html  css  js  c++  java
  • Python Celery与RabbitMQ结合操作

     0、RabbitMQ安装请参考另外一篇博客

    https://www.cnblogs.com/ygbh/p/13461525.html

    1、安装celery模式

    # Celery + RabbitMQ
    pip install "celery[librabbitmq]"
    
    # Celery + RabbitMQ + Redis
    pip install "celery[librabbitmq,redis,auth,msgpack]"

    提示:

    如果是在Window系统开发或运行测试需要进行如下操作,否则会报错:ValueError: not enough values to unpack (expected 3, got 0)
    
    解决方法:
    # 安装协程模块
    pip  install eventlet

     2、编写第一个celery框架的程序

    from celery import Celery
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    
    # 设置配置参数的文件,创建文件celeryconfig.py来配置参数
    app.config_from_object('celeryconfig')
    
    
    @app.task
    def add(x, y):
        """
            求和的函数
        :param x:
        :param y:
        :return:
        """
        return x + y
    tasks.py
    # 中间件
    broker_url = 'pyamqp://'
    
    # 运行结果
    result_backend = 'rpc://'
    
    # 任务序列化
    task_serializer = 'json'
    
    # 结果序列化
    result_serializer = 'json'
    
    # 接收的数据类型
    accept_content = ['json']
    
    # 设置时区
    timezone = 'Asia/Shanghai'
    
    # UTC时区换算关闭
    enable_utc = False
    celeryconfig.py

     测试运行

    运行一个求和的任务

    celery -A tasks worker --loglevel=info --pool=eventlet
    
     -------------- celery@xxx-20200417GVK v4.4.7 (cliffs)
    --- ***** -----
    -- ******* ---- Windows-10-10.0.18362-SP0 2020-08-20 16:42:50
    - *** --- * ---
    - ** ---------- [config]
    - ** ---------- .> app:         tasks:0x2b6850cb7b8
    - ** ---------- .> transport:   amqp://development:**@192.168.2.129:5672//development_host
    - ** ---------- .> results:     rpc://
    - *** --- * --- .> concurrency: 4 (eventlet)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . tasks.add
    
    [2020-08-20 16:42:50,153: INFO/MainProcess] Connected to amqp://development:**@192.168.2.129:5672//development_host
    [2020-08-20 16:42:50,165: INFO/MainProcess] mingle: searching for neighbors
    [2020-08-20 16:42:51,307: INFO/MainProcess] mingle: all alone
    [2020-08-20 16:42:51,327: INFO/MainProcess] pidbox: Connected to amqp://development:**@192.168.2.129:5672//development_host.
    [2020-08-20 16:42:51,335: INFO/MainProcess] celery@xxx-20200417GVK ready.

     在程序目录打开python解释器

    代码目录celert_dir>python
    Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    
    #导入求和模块
    >>> from tasks import add
    
    #给task队列的add任务传入两个数字进行求和
    >>> result=add.delay(1,3)
    
    #检查求和函数是否执行完成
    >>> result.ready()
    True
    
    #获取运算结果,1秒超时处理
    >>> result.get(timeout=1)
    4
    
    #如果有异常的时候,可以通过traceback获取异常信息
    result.traceback

    3、编写一个项目程序

    目录如下:

    proj/
    ├── celery.py
    ├── __init__.py
    └── tasks.py

    代码

    from celery import Celery
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url, include=['proj.tasks'])
    
    app.conf.update(result_expires=3600, )
    
    if __name__ == '__main__':
        app.start()
    celery.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.task
    def mul(x, y):
        return x * y
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    tasks.py

    程序的运行方式

    (1)、前台运行方式

    celery worker -A proj -l info               

    (2)、后台运行方式

    #同时开启两个处理任务
    celery multi start work_task1 -A proj -l info                 
    celery multi start work_task2 -A proj -l info 
    
    #关闭任务
    celery multi stop work_task1 -A proj -l info                 
    celery multi stop work_task2 -A proj -l info 
    
    
    #重启任务
    celery multi restart work_task1 -A proj -l info                 
    celery multi restart work_task2 -A proj -l info 
    
    #等待运行完成后,关闭任务
    celery multi stopwait work_task1 -A proj -l info                 
    celery multi stopwait work_task2 -A proj -l info 

     注意:这种启动方式会报错,不知道是什么问题,该问题已解决!

    从GititHub发现,只是有人提出来,没有得到根本的解决方式,于是自己看了官方文档,并对源码做出修改。

    [2020-08-21 14:50:49,689: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@localhost:5672//:
    Couldn't log in: server connection error 403, message: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile..
    Trying again in 6.00 seconds... (1/100)

    解决方式:

    vi $PYTHON_HOME/lib/site-packages/celery/bin/base.py

        def setup_app_from_commandline(self, argv):
            preload_options, remaining_options = self.parse_preload_options(argv)
            quiet = preload_options.get('quiet')
            if quiet is not None:
                self.quiet = quiet
            try:
                self.no_color = preload_options['no_color']
            except KeyError:
                pass
            workdir = preload_options.get('workdir')
            if workdir:
                os.chdir(workdir)
            app = (preload_options.get('app') or
                   os.environ.get('CELERY_APP') or
                   self.app)
    
    	# Add Code【增加如下代码】##
            ########################
            if app:
                os.environ['CELERY_APP']=app
             ########################
            preload_loader = preload_options.get('loader')
            if preload_loader:
                # Default app takes loader from this env (Issue #1066).
                os.environ['CELERY_LOADER'] = preload_loader
            loader = (preload_loader,
                      os.environ.get('CELERY_LOADER') or
                      'default')
            broker = preload_options.get('broker', None)

     4、编写Celery的main函数启动

    代码

    from celery import Celery
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    
    @app.task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        app.worker_main()
    tasks.py

     运行效果

    5、Celery的配置文件的使用

    5.1、实时的配置方法

    from celery import Celery
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    if __name__ == '__main__':
        app.conf.update(
            enable_utc=False,
            timezone='Asia/Shanghai'
        )
        print('enable_utc', app.conf.timezone)
        print('timezone', app.conf.enable_utc)
    update_config_1.py

    5.2、使用模块名的配置参数

    # 中间件
    broker_url = 'pyamqp://'
    
    # 运行结果
    result_backend = 'rpc://'
    
    # 任务序列化
    task_serializer = 'json'
    
    # 结果序列化
    result_serializer = 'json'
    
    # 接收的数据类型
    accept_content = ['json']
    
    # 设置时区
    timezone = 'Asia/Shanghai'
    
    # UTC时区换算关闭
    enable_utc = False
    celeryconfig.py
    from celery import Celery
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    # 设置配置参数的文件,创建文件celeryconfig.py来配置参数
    app.config_from_object('celeryconfig')
    
    @app.task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        print('enable_utc', app.conf.timezone)
        print('timezone', app.conf.enable_utc)
    tasks.py

    5.3、使用模块的导入方法【不推荐 】 

    # 中间件
    broker_url = 'pyamqp://'
    
    # 运行结果
    result_backend = 'rpc://'
    
    # 任务序列化
    task_serializer = 'json'
    
    # 结果序列化
    result_serializer = 'json'
    
    # 接收的数据类型
    accept_content = ['json']
    
    # 设置时区
    timezone = 'Asia/Shanghai'
    
    # UTC时区换算关闭
    enable_utc = False
    celeryconfig.py
    from celery import Celery
    import celeryconfig
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    # 设置配置参数的文件,创建文件celeryconfig.py来配置参数
    app.config_from_object(celeryconfig)
    
    @app.task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        print('enable_utc', app.conf.timezone)
        print('timezone', app.conf.enable_utc)
    tasks.py

    5.4、使用类的方式进行配置方法

    from celery import Celery
    
    class Config:
        # 中间件
        broker_url = 'pyamqp://'
    
        # 运行结果
        result_backend = 'rpc://'
    
        # 任务序列化
        task_serializer = 'json'
    
        # 结果序列化
        result_serializer = 'json'
    
        # 接收的数据类型
        accept_content = ['json']
    
        # 设置时区
        timezone = 'Asia/Shanghai'
    
        # UTC时区换算关闭
        enable_utc = False
    
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    # 设置配置参数的文件,创建文件celeryconfig.py来配置参数
    app.config_from_object(Config)
    
    @app.task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        print('enable_utc', app.conf.timezone)
        print('timezone', app.conf.enable_utc)
    tasks.py

    5.5、使用环境变量的配置方法 

    from celery import Celery
    import os
    
    # 中间件,这里使用RabbitMQ,pyamqp://Uername:Password@IP:Port//v_host
    broker_url = 'pyamqp://development:root@192.168.2.129:5672//development_host'
    
    # 后端储存,这里使用RabbitMQ,rpc://Uername:Password@IP:Port//v_host
    backend_url = 'rpc://development:root@192.168.2.129:5672//development_host'
    
    # 实例化一个celery对象
    app = Celery('tasks', broker=broker_url, backend=backend_url)
    
    os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')
    app.config_from_envvar('CELERY_CONFIG_MODULE')
    
    @app.task
    def add(x, y):
        return x + y
    
    if __name__ == '__main__':
        print('enable_utc', app.conf.timezone)
        print('timezone', app.conf.enable_utc)
    tasks.py
  • 相关阅读:
    Java 使用Calendar类输出指定年份和月份的日历
    ioc aop
    多线程下单例模式:懒加载(延迟加载)和即时加载
    Java生产环境下性能监控与调优详解
    springboot + zipkin + mysql
    springboot + zipkin(brave-okhttp实现)
    springboot启动方式
    OpenResty实现限流的几种方式
    RocketMQ核心技术精讲与高并发抗压实战
    codis 使用
  • 原文地址:https://www.cnblogs.com/ygbh/p/13530885.html
Copyright © 2011-2022 走看看