zoukankan      html  css  js  c++  java
  • 定时任务调度-Celery

    确保任务不重叠解决方法:

    from celery import task
    from celery.five import monotonic
    from celery.utils.log import get_task_logger
    from contextlib import contextmanager
    from django.core.cache import cache
    from hashlib import md5
    from djangofeeds.models import Feed
    
    logger = get_task_logger(__name__)
    
    LOCK_EXPIRE = 60 * 10  # Lock expires in 10 minutes
    
    @contextmanager
    def memcache_lock(lock_id, oid):
        timeout_at = monotonic() + LOCK_EXPIRE - 3
        # cache.add fails if the key already exists
        status = cache.add(lock_id, oid, LOCK_EXPIRE)
        try:
            yield status
        finally:
            # memcache delete is very slow, but we have to use it to take
            # advantage of using add() for atomic locking
            if monotonic() < timeout_at and status:
                # don't release the lock if we exceeded the timeout
                # to lessen the chance of releasing an expired lock
                # owned by someone else
                # also don't release the lock if we didn't acquire it
                cache.delete(lock_id)
    
    @task(bind=True)
    def import_feed(self, feed_url):
        # The cache key consists of the task name and the MD5 digest
        # of the feed URL.
        feed_url_hexdigest = md5(feed_url).hexdigest()
        lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)
        logger.debug('Importing feed: %s', feed_url)
        with memcache_lock(lock_id, self.app.oid) as acquired:
            if acquired:
                return Feed.objects.import_feed(feed_url).url
        logger.debug(
            'Feed %s is already being imported by another worker', feed_url)
    

    celery 特性:

      Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。由于在工作的平台中用到Celery系统(用于发送邮件、发送短信、发送上线等任务),记录一下学习的知识。

    使用rabbitmq做celery的broker和redis做celery的broker的特性

             使用RabbitMQ作为Celery Broker的优点:
             Highly customizable routing(高度定制路由)
             Persistent queues(一致性队列)

    使用redis作为celery brocker的优点:
            high speed due to in memory datastore(速度极快的内存数据库)
            can double up as both key-value datastore and job queue(可以保证key-value 数据存储及job序列)

    celery-安装

      pip3 install celery(4.0版本celery beat不支持热加载)

    celery-flower监控安装

      pip3 install flower

    django  celery 安装

      pip3 install django-celery

    celery 原理介绍

          某个方法的消息请求celery执行,首先celery根据绑定的规则把任务消息放到制定的路由队列中去,此队列对应的worker节点取出执行。

    说明:
          为什么要定义多个worker?每个worker都会新建一个进程,充分利用服务器资源,提高执行效率。
          同一个服务器可以启动多个worker节点?可以,启动参数里面写上不同的–hostname即可。
          celery默认会创建一个celery任务队列,没有任何绑定的任务将会发送到此消息队列中。

    celery 多woker实验

        celery加redis的多节点配置实例,由于资源限制只找了两台机器做测试

    10.10.42.33
    10.10.190.234
    

      我们把redis服务放在10.10.190.234那台服务器上
      我们把flower服务也启动在10.10.42.33那台服务器上
      代码中定义的队列有queue_add、queue_sum (还有个默认队列celery)
      33、234服务器用于启动worker节点
      33服务器上启动处理celery和queue_add队列的worker节点
      234服务器上启动处理celery和queue_sum队列的worker节点

    配置文件展示

      celeryconfig配置文件:
    cat celeryconfig.py
    #!/usr/bin/python
    #coding:utf-8
    from kombu import Queue
    CELERY_TIMEZONE = 'Asia/Shanghai'
    ####################################
    # 一般配置 #
    ####################################
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_ACCEPT_CONTENT=['json']
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
    # List of modules to import when celery starts.
    CELERY_IMPORTS = ('tasks', )
    CELERYD_MAX_TASKS_PER_CHILD = 40 #  每个worker执行了多少任务就会死掉
    BROKER_POOL_LIMIT = 10 #默认celery与broker连接池连接数
    CELERY_DEFAULT_QUEUE='default'
    CELERY_DEFAULT_ROUTING_KEY='task.default'
    CELERY_RESULT_BACKEND='redis://:fafafa@10.10.190.234:6379/0'  
    BROKER_URL='redis://:fafafa@10.19.190.234:6379/0'  
    #默认队列
    CELERY_DEFAULT_QUEUE = 'celery' #定义默认队列
    CELERY_DEFAULT_ROUTING_KEY = 'celery' #定义默认路由
    CELERYD_LOG_FILE="./logs/celery.log"
    CELERY_QUEUES = (
        Queue("queue_add", routing_key='queue_add'),
        Queue('queue_reduce', routing_key='queue_sum'),
        Queue('celery', routing_key='celery'),
        )
    CELERY_ROUTES = {
        'task.add':{'queue':'queue_add', 'routing_key':'queue_add'},
        'task.reduce':{'queue':'queue_reduce', 'routing_key':'queue_sum'},
    } 
    查看任务配置文件:

      cat task.py

    import os
    import sys
    import datetime
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    sys.path.append(BASE_DIR)
    from celery import Celery
    from celery import chain, group, chord, Task
    from celeryservice import celeryconfig
    app = Celery()
    app.config_from_object(celeryconfig)
    __all__ = ['add', 'reduce','sum_all', 'other']
    ####################################
    # task定义 #
    ####################################
    @app.task
    def add(x, y):
        return x + y
    @app.task
    def reduce(x, y):
        return x - y
    @app.task
    def sum(values):
        return sum([int(value) for value in values])
    @app.task
    def other(x, y):
        return x * y
    

      

    flower任务配置文件

      cat flower.py

    #!/usr/bin/env python
    #coding:utf-8
    broker_api = 'redis://:afafafafa@10.10.190.234:6379/0'
    logging = 'DEBUG'
    address = '0.0.0.0'
    port = 5555
    basic_auth = ['zero:zero']  #外部访问密码
    persistent=True  #持久化celery tasks(如果为false的话,重启flower之后,监控的task就消失了)
    db="./flower/flower_db"

    启动服务:

      在33上启动服务

    celery worker -A  task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33  >/dev/null 2>&1 &
    

      在234上启动服务

    celery worker -A  task --loglevel=info --queues=celery,queue_add --hostname=celery_worker33  >/dev/null 2>&1 &
    

    服务验证:

      在任一台有celeryservice项目代码的服务器上,运行add、reduce、sum、other任务(测试可简单使用add.delay(1,2)等)
      add只会在33上运行,
      sum任务,可能会在33或234服务器的worker节点运行
      reduce任务,只会在234上运行。
      other任务可能会在33或者234上运行。

    关于使用过程中的优化

    使用celery的错误处理机制

      如下内容来自于网站,还没实践,存档用。
      大多数任务并没有使用错误处理,如果任务失败,那就失败了。在一些情况下这很不错,但是作者见到的多数失败任务都是去调用第三方API然后出现了网络错误,
      或者资源不可用这些错误,而对于这些错误,最简单的方式就是重试一下,也许就是第三方API临时服务或者网络出现问题,没准马上就好了,那么为什么不试着加个重试测试一下呢?

    @app.task(bind=True, default_retry_delay=300, max_retries=5)
    def my_task_A():
        try:
            print("doing stuff here...")
        except SomeNetworkException as e:
            print("maybe do some clenup here....")
            self.retry(e)
    

      

     定时任务遇到的问题:

      

      通过flower 查看 跑多线程报错, 需要减少线程数.

     celery配置文件的一些详细解释:

    # -*- coding:utf-8 -*-                                                                                                                                                  
    from datetime import timedelta
    from settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_DB_NUM
     
     
    # 某个程序中出现的队列,在broker中不存在,则立刻创建它
    CELERY_CREATE_MISSING_QUEUES = True
     
    CELERY_IMPORTS = ("async_task.tasks", "async_task.notify")
     
    # 使用redis 作为任务队列
    BROKER_URL = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/' + str(REDIS_DB_NUM)
     
    #CELERY_RESULT_BACKEND = 'redis://:' + REDIS_PASSWORD + '@' + REDIS_HOST + ':' + str(REDIS_PORT) + '/10'
     
    CELERYD_CONCURRENCY = 20  # 并发worker数
     
    CELERY_TIMEZONE = 'Asia/Shanghai'
     
    CELERYD_FORCE_EXECV = True    # 非常重要,有些情况下可以防止死锁
     
    CELERYD_PREFETCH_MULTIPLIER = 1
     
    CELERYD_MAX_TASKS_PER_CHILD = 100    # 每个worker最多执行万100个任务就会被销毁,可防止内存泄露
    # CELERYD_TASK_TIME_LIMIT = 60    # 单个任务的运行时间不超过此值,否则会被SIGKILL 信号杀死 
    # BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 90}
    # 任务发出后,经过一段时间还未收到acknowledge , 就将任务重新交给其他worker执行
    CELERY_DISABLE_RATE_LIMITS = True   
     
    # 定时任务
    CELERYBEAT_SCHEDULE = {
        'msg_notify': {
            'task': 'async_task.notify.msg_notify',
            'schedule': timedelta(seconds=10),
            #'args': (redis_db),
            'options' : {'queue':'my_period_task'}
        },
        'report_result': {
            'task': 'async_task.tasks.report_result',
            'schedule': timedelta(seconds=10),
          #'args': (redis_db),
            'options' : {'queue':'my_period_task'}
        },
        #'report_retry': {
        #    'task': 'async_task.tasks.report_retry',
        #    'schedule': timedelta(seconds=60),
        #    'options' : {'queue':'my_period_task'}
        #},
     
    }
    ################################################
    # 启动worker的命令
    # *** 定时器 ***
    # nohup celery beat -s /var/log/boas/celerybeat-schedule  --logfile=/var/log/boas/celerybeat.log  -l info &
    # *** worker ***
    # nohup celery worker -f /var/log/boas/boas_celery.log -l INFO &
    

    celery整体架构图:

      

  • 相关阅读:
    Convert CString to std::string
    VC 使用预编译头
    [转]Windows下使用doxygen阅读和分析C/C++代码
    [SCOI2016]背单词
    Linux配置日志服务器
    网络学习day02_OSI七层模型及数据的传输过程
    网络学习day04_VLSM、子网划分
    XSS闯关游戏准备阶段及XSS构造方法
    网络学习day03_IP地址概述与应用
    网络学习day01_计算机网络与分层思想
  • 原文地址:https://www.cnblogs.com/liujiliang/p/9253606.html
Copyright © 2011-2022 走看看