zoukankan      html  css  js  c++  java
  • python测试开发django157.celery异步与redis环境搭建 上海

    前言

    Celery 是一个分布式队列的管理工具, 可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列.
    使用于生产环境的消息代理有 RabbitMQ 和 Redis,还可以使用数据库,本篇介绍redis使用

    Redis 环境搭建

    Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库,并提供多种语言的 API
    Redis 与其他 key - value 缓存产品有以下三个特点:

    • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
    • Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。
    • Redis支持数据的备份,即master-slave模式的数据备份。

    使用 docker 安装Redis

    docker pull redis:latest
    

    运行容器

    docker run -itd --name redis-test -p 6379:6379 redis
    

    映射容器服务的 6379 端口到宿主机的 6379 端口。外部可以直接通过宿主机ip:6379 访问到 Redis 的服务。
    上面是没有设置密码的,设置密码用下面这句

    docker run -itd --name myredis   -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes 
    

    django依赖包

    django使用的版本是v2.1.2
    安装celery版本

    pip install celery==3.1.26.post2
    

    安装django-celery包

    pip install django-celery==3.3.1
    

    安装Redis

    pip install redis==2.10.6
    

    Django 中使用 Celery

    要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)

    如果你有一个现代的 Django 项目布局,比如:

    - proj/
      - manage.py
      - proj/
        - __init__.py
        - settings.py
        - urls.py
    

    那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:

    import os
    
    from celery import Celery
    
    # Set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery('proj')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django apps.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print(f'Request: {self.request!r}')
    

    其中debug_task是测试的任务,可以注掉

    # @app.task(bind=True)
    # def debug_task(self):
    #     print('Request: {0!r}'.format(self.request))
    

    上面一段只需改这句,'proj'是自己django项目的app名称

    app = Celery('proj')
    

    然后你需要在你的proj/proj/init.py 模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便@shared_task装饰器(稍后提到)将使用它:

    proj/proj/init.py:

    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ('celery_app',)
    

    上面这段固定的,不用改

    tasks任务

    在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件

    
    @shared_task
    def add(x, y):
        print("task----------1111111111111111111111")
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    

    tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器

    添加setting配置

    setting.py添加配置

    #  celery 配置连接redis
    BROKER_URL = 'redis://ip:6379'
    CELERY_RESULT_BACKEND = 'redis://ip:6379'
    
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT=['json']
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    

    创建视图

    views.py创建视图

    from .tasks import add, mul
    
    def task_demo(request):
        res = add.delay(10, 20)
        print(res.task_id)  # 返回task_id
        return JsonResponse({"code": 0, "res": res.task_id})
    

    启动worker

    前面pip已经安装过celery应用了,celery是一个独立的应用,可以启动worker

    celery -A MyDjango worker -l info
    

    其中MyDjango是你自己的django项目名称

    运行日志

     -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater)
    ---- **** -----
    --- * ***  * -- Windows-10-10.0.17134-SP0
    -- * - **** ---
    - ** ---------- [config]
    - ** ---------- .> app:         yoyo:0x1ea1a96e9b0
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     redis://localhost:6379/
    - *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ----
    --- ***** ----- [queues]
     -------------- .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . yoyo.tasks.add
      . yoyo.tasks.mul
    
    [2021-10-18 22:45:03,155: INFO/MainProcess] Connected to redis://localhost:6379//
    [2021-10-18 22:45:03,347: INFO/MainProcess] mingle: searching for neighbors
    [2021-10-18 22:45:04,897: INFO/MainProcess] mingle: all alone
    [2021-10-18 22:45:05,406: WARNING/MainProcess] e:\python36\lib\site-packages\celery\fixups\django.py:265: 
    UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
      warnings.warn('Using settings.DEBUG leads to a memory leak, never '
    [2021-10-18 22:45:05,407: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
    

    运行的时候,当我们看到"Connected to redis"说明已经连接成功了!

    连接过程中如果出现报错:redis celery:AttributeError: str object has no attribute items

    [2021-10-18 17:15:21,801: ERROR/MainProcess] Unrecoverable error: AttributeError("'str' object has no attribute 'items'",)
    Traceback (most recent call last):
      File "e:\python36\lib\site-packages\celery\worker\__init__.py", line 206, in start
        self.blueprint.start(self)
      File "e:\python36\lib\site-packages\celery\bootsteps.py", line 123, in start
        step.start(parent)
      File "e:\python36\lib\site-packages\celery\bootsteps.py", line 374, in start
        return self.obj.start()
      File "e:\python36\lib\site-packages\celery\worker\consumer.py", line 280, in start
        blueprint.start(self)
      File "e:\python36\lib\site-packages\celery\bootsteps.py", line 123, in start
        step.start(parent)
      File "e:\python36\lib\site-packages\celery\worker\consumer.py", line 884, in start
        c.loop(*c.loop_args())
      File "e:\python36\lib\site-packages\celery\worker\loops.py", line 103, in synloop
        connection.drain_events(timeout=2.0)
      File "e:\python36\lib\site-packages\kombu\connection.py", line 288, in drain_events
        return self.transport.drain_events(self.connection, **kwargs)
      File "e:\python36\lib\site-packages\kombu\transport\virtual\__init__.py", line 847, in drain_events
        self._callbacks[queue](message)
      File "e:\python36\lib\site-packages\kombu\transport\virtual\__init__.py", line 534, in _callback
        self.qos.append(message, message.delivery_tag)
      File "e:\python36\lib\site-packages\kombu\transport\redis.py", line 146, in append
        pipe.zadd(self.unacked_index_key, delivery_tag, time()) \
      File "e:\python36\lib\site-packages\redis\client.py", line 2320, in zadd
        for pair in iteritems(mapping):
      File "e:\python36\lib\site-packages\redis\_compat.py", line 109, in iteritems
        return iter(x.items())
    AttributeError: 'str' object has no attribute 'items'
    

    redis版本问题,报错版本redis=3.2.1,降低版本redis=2.10.6后,解决

    shell交互环境

    在django shell交互环境调试运行任务

    D:\202107django\MyDjango>python manage.py shell
    Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32
    Type "help", "copyright", "credits" or "license" for more information.
    (InteractiveConsole)
    >>> from yoyo.tasks import add,mul
    >>> from celery.result import AsyncResult
    >>>
    >>> res = add.delay(11, 12)
    >>> res
    <AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
    >>> res.status
    'SUCCESS'
    >>>
    >>> res.backend
    <celery.backends.redis.RedisBackend object at 0x0000015E011C3128>
    >>>
    >>> res.task_id
    'c5ff83a4-4840-4b36-8869-5ce6081904f1'
    >>>
    >>>
    >>> get_task = AsyncResult(id=res.task_id)
    >>> get_task
    <AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1>
    >>> get_task.get()
    23
    >>>
    

    res.status是查看任务状态
    res.task_id 是获取任务的id
    根据任务的id查询任务的执行结果AsyncResult(id=res.task_id).get()获取

  • 相关阅读:
    P2533 [AHOI2012]信号塔
    P1452 Beauty Contest
    P3194 [HNOI2008]水平可见直线
    P2924 [USACO08DEC]大栅栏Largest Fence
    P2742 【模板】二维凸包 / [USACO5.1]圈奶牛Fencing the Cows
    P4208 [JSOI2008]最小生成树计数
    P4280 [AHOI2008]逆序对
    P3199 [HNOI2009]最小圈
    P3343 [ZJOI2015]地震后的幻想乡
    剪刀,石头,布,小游戏脚本
  • 原文地址:https://www.cnblogs.com/yoyoketang/p/15422804.html
Copyright © 2011-2022 走看看