zoukankan      html  css  js  c++  java
  • celery 配置手册

    1.node2:/celery/djtest/djtest#celery --version
    3.1.25 (Cipater)
    
    2. 目的
    
    在开发项目中,经常有一些操作时间比较长(生产环境中超过了nginx的timeout时间),
    
    
    或者是间隔一段时间就要执行任务。在这种情况下,使用celery就是一个很好的选择。
    
    
    
    celery 是一个异步任务队列/基于分布式消息传递的作业队列
    
    celery通过消息(message)进行通信,使用代理(broker)在客户端和工作执行者之间进行交互。
    
    当开始一个任务时,客户端发送消息到队列并由代理将其发往响应的工作执行者处。
    
    准备使用redis作为消息代理(broker),Django数据库作为结果存储(ResultStore)
    
    
    3.安装
    redis:
    windows:
    https://github.com/MSOpenTech/redis/releases/
    linux:
    yum install redis-server
    PS:需要在cmd中运行,不能再powercmd。很奇怪。
    pip install celery
    pip install celery-with-redis
    pip install django-celery
    
     python manage.py runserver 0.0.0.0:10501
     
     django-admin startproject djtest
    
    4.django 代码(whthas_home为project,portal为app)
    
    
    
    这里的 whthas_home==/celery/djtest/djtest
    
    node2:/celery/djtest/djtest#cat __init__.py
    #!/bin/python
    from __future__ import absolute_import
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    You have mail in /var/spool/mail/root
    node2:/celery/djtest/djtest#
    
    INSTALLED_APPS = [
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'suit',
        'django.contrib.admin',
        'DjangoUeditor',
        'portal',
        'djcelery',
    ]
    
    
    增加文件,
    
    node2:/celery/djtest/djtest#cat celery.py
    from __future__ import absolute_import
    
    import os
    from celery import Celery
    from django.conf import settings
    
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings')
    
    app = Celery('portal')
    
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    	
    node2:/celery/djtest#python manage.py migrate
    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, djcelery, sessions
    Running migrations:
      Applying djcelery.0001_initial... OK
    node2:/celery/djtest#
    
    
    mysql> show tables;
    +----------------------------+
    | Tables_in_djtest           |
    +----------------------------+
    | auth_group                 |
    | auth_group_permissions     |
    | auth_permission            |
    | auth_user                  |
    | auth_user_groups           |
    | auth_user_user_permissions |
    | celery_taskmeta            |
    | celery_tasksetmeta         |
    | django_admin_log           |
    | django_content_type        |
    | django_migrations          |
    | django_session             |
    | djcelery_crontabschedule   |
    | djcelery_intervalschedule  |
    | djcelery_periodictask      |
    | djcelery_periodictasks     |
    | djcelery_taskstate         |
    | djcelery_workerstate 
    
    
    node2:/celery/djtest#./manage.py startapp portal
    node2:/celery/djtest#ls
    db.sqlite3  djtest  manage.py  portal
    
    注意!!!!!!!!
    一定要先创建应用,在编辑node2:/celery/djtest#vim djtest/settings.py
    
    添加portal应用
    
    
    增加文件,portal/tasks.py:
    
    from celery import task
    from time import sleep
    
    
    @task()
    def Task_A(message):
        Task_A.update_state(state='PROGRESS', meta={'progress': 0})
        sleep(10)
        Task_A.update_state(state='PROGRESS', meta={'progress': 30})
        sleep(10)
        return message
    
    
    def get_task_status(task_id):
        task = Task_A.AsyncResult(task_id)
    
        status = task.state
        progress = 0
    
        if status == u'SUCCESS':
            progress = 100
        elif status == u'FAILURE':
            progress = 0
        elif status == 'PROGRESS':
            progress = task.info['progress']
        
        return {'status': status, 'progress': progress}
    	
    3.测试:
    
    node2:/celery/djtest#python manage.py celeryd -l info
    /usr/local/python27/lib/python2.7/site-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is
    absolutely not recommended!
    
    Please specify a different user using the -u option.
    
    User information: uid=0 euid=0 gid=0 egid=0
    
      uid=uid, euid=euid, gid=gid, egid=egid,
     
     -------------- celery@node2 v3.1.25 (Cipater)
    ---- **** ----- 
    --- * ***  * -- Linux-2.6.32-431.el6.x86_64-x86_64-with-centos-6.5-Final
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         portal:0x1a90490
    - ** ---------- .> transport:   redis://127.0.0.1:6379/0
    - ** ---------- .> results:     redis://127.0.0.1:6379/0
    - *** --- * --- .> concurrency: 1 (prefork)
    -- ******* ---- 
    --- ***** ----- [queues]
     -------------- .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . djtest.celery.debug_task
      . portal.tasks.Task_A
    
    [2017-12-05 20:27:29,837: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
    [2017-12-05 20:27:29,848: INFO/MainProcess] mingle: searching for neighbors
    [2017-12-05 20:27:30,857: INFO/MainProcess] mingle: all alone
    [2017-12-05 20:27:30,866: WARNING/MainProcess] /usr/local/python27/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
      warnings.warn('Using settings.DEBUG leads to a memory leak, never '
    [2017-12-05 20:27:30,866: WARNING/MainProcess] celery@node2 ready.
    
    
    测试:
    
    node2:/celery/djtest#python manage.py shell
    Python 2.7.3 (default, Mar 30 2017, 20:15:12) 
    [GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    (InteractiveConsole)
    >>> from portal.tasks import *
    >>> t=Task_A.delay("heel2")
    
    
    [2017-12-05 20:28:51,801: INFO/MainProcess] Received task: portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6]
    [2017-12-05 20:29:11,828: INFO/MainProcess] Task portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6] succeeded in 20.024760922s: u'heel2'
    
    
    4.django后台定义任务
    
    
    
    
    
    
    
    5、执行任务
    启动broker:python manage.py celeryd -l info
    
    因为这里是个定时任务,所以还需要启动心跳 :python manage.py celery beat  
    broker侧能看到:

  • 相关阅读:
    14_java之变量|参数|返回值|修饰符
    NYOJ 202 红黑树 (二叉树)
    NYOJ 138 找球号(二) (哈希)
    NYOJ 136 等式 (哈希)
    NYOJ 133 子序列 (离散化)
    NYOJ 129 树的判定 (并查集)
    NYOJ 117 求逆序数 (树状数组)
    NYOJ 93 汉诺塔 (数学)
    HDU 2050 折线分割平面 (数学)
    天梯赛L2-008 最长对称子串 (字符串处理)
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13349388.html
Copyright © 2011-2022 走看看