zoukankan      html  css  js  c++  java
  • celery消费任务和任务定时的操作

    • 开发环境

      ubuntu 18.04 + python3.6.9 / windows10 + python 3.6.9
      
    • 安装依赖

      # 安装django==2.2.17
      pip install django==2.2.17
      # 安装celery == 4.4.0
      pip install celery==4.4.0
      # 安装 redis == 3.5.3
      pip install redis==3.5.3
      # 安装 flower == 0.9.7
      pip install flower==0.9.7
      
    • 创建 django 项目和应用

      #  创建项目(mysite)
      django-admin startproject mysite
      # 切换到mysite中
      cd mysite
      # 创建应用 (user)
      python manage.py startapp user
      
    • mysite 目录下 新建 celery_task

    • celery_task 中新建 celery.pyconfig.py

      # config.py
      from __future__ import absolute_import  # 拒绝隐式引入,因为celery.py的名字和celery的包名冲突,需要使用这条语句让程序正确地运行
      from celery.schedules import crontab
      from nest_e100.settings import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD
      
      broker_url = "redis://:{}@{}:{}/98".format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)  # 使用redis存储任务队列
      result_backend = "redis://:{}@{}:{}/99".format(REDIS_PASSWORD, REDIS_HOST, REDIS_PORT)  # 使用redis存储结果
      
      task_serializer = 'json'
      result_serializer = 'json'
      accept_content = ['json']
      timezone = "Asia/Shanghai"  # 时区设置
      worker_hijack_root_logger = False  # celery默认开启自己的日志,可关闭自定义日志,不关闭自定义日志输出为空
      result_expires = 60 * 60 * 24  # 存储结果过期时间(默认1天)
      
      # 导入任务所在文件
      imports = [
          "celery_task.crontab.company_task",  # 导入py文件
      ]
      
      # 需要执行任务的配置
      beat_schedule = {
          "crontab_company_task": {
              "task": "celery_task.crontab.company_task.check_company_task",  # 执行的函数
              "schedule": crontab(minute="*/1"), # 每分钟执行一次
              "args": ()  # # 任务函数参数
          },
      }
      
      # celery.py
      from celery import Celery
      import django, os, sys
      # 获取根目录
      base_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "nest_e100")
      sys.path.append(base_dir)
      os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nest_e100.settings')
      # 实例化django项目中的每一个app实例
      django.setup()
      # 创建celery应用对象
      app = Celery("celery_demo")
      
      # 导入celery的配置信息
      app.config_from_object("celery_task.config")
      
    • 新建定时任务和任务消费

      # 在 celery_task/crontab/company_task.py 做公司定时任务
      from celery_task.celery import app
      from .data_access.cache_manager import RedisUtil
      from celery_task.tasks.company_task import e100_company_task
      from celery_task.common.mytime import cal_count_down_time
      from .utils.logger import logger
      
      # 实例化redis
      redis_util_cache = RedisUtil("openapi")
      
      # 检查商户详情信息
      def check_company_info():
          data = redis_util_cache.get_key_values("company*")
          if not data:
              logger.warning(">>>>>>>>>>>>>>>>>> redis中没有公司信息,请检查...")
              return False
          for i in data:
              key = i.get("key", "")
              if key:
                  # 计算该key 剩余的时间的秒数
                  time_left_timestamp = redis_util_cache.ttl(key)
                  # 计算时间差是否在规定范围内,如果在,返回True,则添加队列中,否则返回False,不做任何处理
                  flag = cal_count_down_time(time_left_timestamp)
                  # 添加队列中
                  if flag: e100_company_task.delay({"com_id": key[8:]})
          print(">>>>>>>> crontab check company detail info is end ..")
      
      
      @app.task
      def check_company_task():
          check_company_info()
      
      
      if __name__ == '__main__':
          check_company_task()
      
      
      # 在 celery_task/task/company_task.py 做公司任务消费
      from celery_task.celery import app
      from .data_access.cache_manager import RedisUtil
      from .utils.account_manager import request_to_openplatform_manage
      from .utils.external_url_request import external_system_url
      from django.conf import settings
      import json
      # 实例化redis
      openapi_redis_cache = RedisUtil("openapi")
      
      # 更新商户详情信息
      @app.task
      def e100_company_task(data):
          # 请求开发平台获取数据
          response = request_to_openplatform_manage(data, external_system_url.get_openapi_company_details_info)
          # 获取的数据
          ret = response.data.get("data", "")
          # 将数据放到cache中, key:  company_公司Id
          openapi_redis_cache.set("company_%s" % (data.get("com_id")), json.dumps(ret),
                                  expire_time=settings.TOKEN_CACHE_TIME)
      
      
    • 启动任务

      # 启动django项目
      python manage.py runserver
      # 启动celery 任务, 如果是windows启动,需要在后面添加 --pool=solo
      celery -A celery_task.celery worker -l info 
      # 启动celery定时
      celery -A celery_task.celery beat
      # 启动flower
      celery -A celery_task flower
      
    • 访问 http://127.0.0.1:5555

    • 最终的项目结构为:

  • 相关阅读:
    关于网络字节序(network byte order)和主机字节序(host byte order)
    关于垃圾回收,我来解释下为什么LocalConnection可以实现垃圾回收
    解决Form中ExternalInterface的Bug问题
    AS3里var aa:String是null还是""?
    IE并发连接限制(as)
    tar
    mysql默认端口号3306
    flex经验
    这个游戏不错
    nginx介绍
  • 原文地址:https://www.cnblogs.com/wuxiaoshi/p/14913612.html
Copyright © 2011-2022 走看看