zoukankan      html  css  js  c++  java
  • celery 配置

    
    1 创建project:
    node2:/scan#django-admin.py startproject picha 
    node2:/scan#ls
    picha
    
    
    创建应用:
    
    
    node2:/scan/picha#django-admin.py startapp demo
    node2:/scan/picha#ls
    demo  manage.py  picha
    
    node2:/scan/picha#tree
    .
    ├── demo
    │   ├── admin.py
    │   ├── apps.py
    │   ├── __init__.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   └── views.py
    ├── manage.py
    └── picha
        ├── __init__.py
        ├── settings.py
        ├── urls.py
        └── wsgi.py
    
    3 directories, 12 files
    
    
    二 下面在settings文件中配置celery相关的配置:
    
    # CELERY STUFF
    import djcelery
    djcelery.setup_loader()
    BROKER_URL = 'redis://localhost:6379'
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定时任务
    CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379'
    CELERY_ACCEPT_CONTENT = ['application/json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    
    
    INSTALLED_APPS = (
        'django.contrib.admin',
        'django.contrib.auth',
        'django.contrib.contenttypes',
        'django.contrib.sessions',
        'django.contrib.messages',
        'django.contrib.staticfiles',
        'demo',
        'djcelery',
    )
    
    
    然后修改市区:
    
    TIME_ZONE = 'Asia/Shanghai'
    时区不对,计划任务是不会按时间执行的!
    
    
    另外,我们还需要在创建一个celery.py文件,他会自动发现我们app下面的task!
    
    #! /usr/bin/env python
    # coding: utf-8
    
    from __future__ import absolute_import
    import os
    from celery import Celery
    from django.conf import settings
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'picha.settings')
    app = Celery('picha')
    
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    
    
    现在我们在demo的app下面创建测试用的task!
    
    from __future__ import absolute_import
    from celery import shared_task,task
    
    
    @shared_task()
    def add(x,y):
        # return x + y
        print x + y
    
    @shared_task()
    def mul(x,y):
        print "%d * %d = %d" %(x,y,x*y)
        return x*y
    
    @shared_task()
    def sub(x,y):
        print "%d - %d = %d"%(x,y,x-y)
        return x - y
    
    @task(ignore_result=True,max_retries=1,default_retry_delay=10)
    def just_print():
        print "Print from celery task"
    
    @task()
    def Task_A(message):
        Task_A.update_state(state='PROGRESS', meta={'progress': 0})
        sleep(10)
        Task_A.update_state(state='PROGRESS', meta={'progress': 30})
        sleep(10)
        return message
    
    
    
    
    
    python manage.py migrate
    
    node2:/scan/picha#python manage.py migrate
    Operations to perform:
      Apply all migrations: admin, auth, contenttypes, djcelery, sessions
    Running migrations:
      Applying contenttypes.0001_initial... OK
      Applying auth.0001_initial... OK
      Applying admin.0001_initial... OK
      Applying admin.0002_logentry_remove_auto_add... OK
      Applying contenttypes.0002_remove_content_type_name... OK
      Applying auth.0002_alter_permission_name_max_length... OK
      Applying auth.0003_alter_user_email_max_length... OK
      Applying auth.0004_alter_user_username_opts... OK
      Applying auth.0005_alter_user_last_login_null... OK
      Applying auth.0006_require_contenttypes_0002... OK
      Applying auth.0007_alter_validators_add_error_messages... OK
      Applying auth.0008_alter_user_username_max_length... OK
      Applying djcelery.0001_initial... OK
      Applying sessions.0001_initial... OK
    node2:/scan/picha#
    
    
    启动celery:
    
    
    node2:/celery/djtest#python manage.py celeryd -l info
    
    
    测试:
     
    node2:/celery/djtest#python manage.py shell
    Python 2.7.3 (default, Mar 30 2017, 20:15:12) 
    [GCC 4.4.7 20120313 (Red Hat 4.4.7-17)] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    (InteractiveConsole)
    >>> from demo.tasks import *
    >>> t=Task_A.delay("heel2")
     
     
    [2017-12-05 20:28:51,801: INFO/MainProcess] Received task: portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6]
    [2017-12-05 20:29:11,828: INFO/MainProcess] Task portal.tasks.Task_A[d6d3d300-1b5f-481a-be42-d21c1af99fd6] succeeded in 20.024760922s: u'heel2'
     
    
    
    [2018-09-16 22:06:55,006: WARNING/MainProcess] celery@node2 ready.
    [2018-09-16 22:10:13,534: ERROR/MainProcess] Received unregistered task of type u'demo.tasks.Task_A'.
    The message has been ignored and discarded.
    
    Did you remember to import the module containing this task?
    Or maybe you are using relative imports?
    Please see http://bit.ly/gLye1c for more information.
    
    The full contents of the message body was:
    {u'utc': True, u'chord': None, u'args': [u'heel2'], u'retries': 0, u'expires': None, u'task': u'demo.tasks.Task_A', u'callbacks': None, u'errbacks': None, u'timelimit': [None, None], u'taskset': None, u'kwargs': {}, u'eta': None, u'id': u'0d25a203-f4e4-4696-8638-39d4d9eb8c8f'} (261b)
    Traceback (most recent call last):
      File "/usr/local/python27/lib/python2.7/site-packages/celery/worker/consumer.py", line 465, in on_task_received
        strategies[type_](message, body,
    KeyError: u'demo.tasks.Task_A'
    
    调整tasks.py 需要重启服务
    
    
    node2:/scan/picha#^C
    node2:/scan/picha#python manage.py celeryd -l info
    /usr/local/python27/lib/python2.7/site-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is
    absolutely not recommended!
    
    Please specify a different user using the -u option.
    
    User information: uid=0 euid=0 gid=0 egid=0
    
      uid=uid, euid=euid, gid=gid, egid=egid,
     
     -------------- celery@node2 v3.1.26.post2 (Cipater)
    ---- **** ----- 
    --- * ***  * -- Linux-2.6.32-431.el6.x86_64-x86_64-with-centos-6.5-Final
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         default:0x2ddee10 (djcelery.loaders.DjangoLoader)
    - ** ---------- .> transport:   redis://localhost:6379//
    - ** ---------- .> results:     redis://localhost:6379/
    - *** --- * --- .> concurrency: 1 (prefork)
    -- ******* ---- 
    --- ***** ----- [queues]
     -------------- .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . demo.tasks.Task_A
      . demo.tasks.add
      . demo.tasks.just_print
      . demo.tasks.mul
      . demo.tasks.sub
    
    [2018-09-16 22:13:24,162: INFO/MainProcess] Connected to redis://localhost:6379//
    [2018-09-16 22:13:24,173: INFO/MainProcess] mingle: searching for neighbors
    [2018-09-16 22:13:25,181: INFO/MainProcess] mingle: all alone
    [2018-09-16 22:13:25,192: WARNING/MainProcess] /usr/local/python27/lib/python2.7/site-packages/djcelery/loaders.py:133: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
      warn('Using settings.DEBUG leads to a memory leak, never '
    [2018-09-16 22:13:25,192: WARNING/MainProcess] celery@node2 ready.
    
  • 相关阅读:
    SQL语法:查询此表有另外一个表没有的数据
    .NET平台开源项目速览-最快的对象映射组件Tiny Mapper之项目实践
    win7 64 安装Oracle 11G 、使用PLSQL进行连接 标准实践
    json 筛选数据 $.grep过滤数据
    bootstrap table 行号 显示行号 添加行号 bootstrap-table 行号
    ajax 请求二进制流 图片 文件 XMLHttpRequest 请求并处理二进制流数据 之最佳实践
    JS中判断JSON数据是否存在某字段的方法 JavaScript中判断json中是否有某个字段
    json 数字key json 数字作为主键
    ajax 跨域 headers JavaScript ajax 跨域请求 +设置headers 实践
    扩展:gridview 空数据时显示表头
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13349024.html
Copyright © 2011-2022 走看看