zoukankan      html  css  js  c++  java
  • 异步任务利器Celery(二)在django项目中使用Celery

    Celery 4.0支持django1.8及以上的版本,低于1.8的项目使用Celery 3.1。

    一个django项目的组织如下:

    - proj/
      - manage.py
      - proj/
        - __init__.py
        - settings.py
        - urls.py
    

    首先建立proj/proj/celery.py文件:

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
    
    app = Celery('proj')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    

    然后要保证django项目启动时上述的app被载入,修改proj/proj/__init__.py文件:

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']
    

    现在就可以在INSTALLED_APPS中的app下建立tasks.py文件啦:

    - app1/
        - tasks.py
        - models.py
    - app2/
        - tasks.py
        - models.py
    

    比如:

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    在views中调用这些tasks即可异步运行。

    如果使用Redis作为broker,在settings.py中添加:

    CELERY_BROKER_URL = 'redis://localhost:6379/0'

    可以使用Django ORM/Cache作为储存backend。

    下载库:

    $ pip install django-celery-results
    

    设定settings.py:

    INSTALLED_APPS = (
        ...,
        'django_celery_results',
    )
    

    建立数据表:

    $ python manage.py migrate django_celery_results
    

    在settings.py中添加Celery设置:

    CELERY_RESULT_BACKEND = 'django-db'

    或者

    CELERY_RESULT_BACKEND = 'django-cache'

    区别在于使用django ORM还是使用django缓存系统。

    启动:

    $ celery -A proj worker -l info
    

    可以在python manage.py shell中调用:

    $ python manage.py shell
    Python 2.7.12 (default, Nov 19 2016, 06:48:10) 
    [GCC 5.4.0 20160609] on linux2
    Type "help", "copyright", "credits" or "license" for more information.
    (InteractiveConsole)
    >>> from app1.tasks import add
    >>> add.delay(3,4)
    <AsyncResult: a9abab6d-b7a9-47e6-8c09-ec284948449f>
    

    celery日志:

    [2017-09-14 00:09:41,432: INFO/ForkPoolWorker-1] Task urldata.tasks.add[38af760e-ed6c-48f8-b77c-d67bade8d6b8] succeeded in 0.00782653002534s: 7
    

    官方一个完整的例子:https://github.com/celery/celery/tree/master/examples/django/

    官方文档还有一个异步审查用户上传评论的例子。

    blog/models.py:

    from django.db import models
    from django.utils.translation import ugettext_lazy as _
    
    
    class Comment(models.Model):
        name = models.CharField(_('name'), max_length=64)
        email_address = models.EmailField(_('email address'))
        homepage = models.URLField(_('home page'),
                                   blank=True, verify_exists=False)
        comment = models.TextField(_('comment'))
        pub_date = models.DateTimeField(_('Published date'),
                                        editable=False, auto_add_now=True)
        is_spam = models.BooleanField(_('spam?'),
                                      default=False, editable=False)
    
        class Meta:
            verbose_name = _('comment')
            verbose_name_plural = _('comments')
    

    在views中先保存评论,同时调用celery异步审核。

    blog/views.py:

    from django import forms
    from django.http import HttpResponseRedirect
    from django.template.context import RequestContext
    from django.shortcuts import get_object_or_404, render_to_response
    
    from blog import tasks
    from blog.models import Comment
    
    
    class CommentForm(forms.ModelForm):
    
        class Meta:
            model = Comment
    
    
    def add_comment(request, slug, template_name='comments/create.html'):
        post = get_object_or_404(Entry, slug=slug)
        remote_addr = request.META.get('REMOTE_ADDR')
    
        if request.method == 'post':
            form = CommentForm(request.POST, request.FILES)
            if form.is_valid():
                comment = form.save()
                # Check spam asynchronously.
                tasks.spam_filter.delay(comment_id=comment.id,
                                        remote_addr=remote_addr)
                return HttpResponseRedirect(post.get_absolute_url())
        else:
            form = CommentForm()
    
        context = RequestContext(request, {'form': form})
        return render_to_response(template_name, context_instance=context)
    

    tasks如下:

    blog/tasks.py

    from celery import Celery
    
    from akismet import Akismet
    
    from django.core.exceptions import ImproperlyConfigured
    from django.contrib.sites.models import Site
    
    from blog.models import Comment
    
    
    app = Celery(broker='amqp://')
    
    
    @app.task
    def spam_filter(comment_id, remote_addr=None):
        logger = spam_filter.get_logger()
        logger.info('Running spam filter for comment %s', comment_id)
    
        comment = Comment.objects.get(pk=comment_id)
        current_domain = Site.objects.get_current().domain
        akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
        if not akismet.verify_key():
            raise ImproperlyConfigured('Invalid AKISMET_KEY')
    
    
        is_spam = akismet.comment_check(user_ip=remote_addr,
                            comment_content=comment.comment,
                            comment_author=comment.name,
                            comment_author_email=comment.email_address)
        if is_spam:
            comment.is_spam = True
            comment.save()
    
        return is_spam
    

      

  • 相关阅读:
    【Oracle】EXPDP和IMPDP数据泵进行导出导入的方法
    【Oracle】无法删除当前连接的用户
    消除SVN锁定
    提取当前目录所有文件名
    【Weblogic】domain快速启动脚本
    Spring @Transactional注解不回滚不起作用无效
    协方差与相关系数
    利用深度学习解决直播支付风控[转]
    高质量API网关组件实现
    git使用初探
  • 原文地址:https://www.cnblogs.com/linxiyue/p/7518535.html
Copyright © 2011-2022 走看看