一、celery简介
1:celery是什么
Celery是一个python开发的异步分布式任务调度模块。
2:celery是使用场景
异步任务:将耗时操作任务提交给Celery去异步执行,比如发送短信/邮件、消息推送、音视频处理等等
定时任务:定时执行某件事情,比如每天数据统计
3:celery特点
简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的。
高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务。
快速:一个单进程的celery每分钟可处理上百万个任务。
灵活: 几乎celery的各个组件都可以被扩展及自定制。
4:工作原理
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等
任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等
二、Hello,Celery
1:版本介绍
在Django项目的环境下配合celery和redis使用异步调用
Django==3.0.4
celery==3.1.26.post2
django-celery==3.3.1
django-redis==4.7.0
redis==2.10.6
安装相应的软件包
pip install redis==2.10.6 -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install django-redis==4.7.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
2:场景介绍(异步)
比如,想泡壶茶喝。当时的情况是:开水没有;水壶要洗,茶壶,茶杯要洗;火已生了,茶叶也有了。怎么办?
办法甲:洗好水壶,灌上凉水,放在火上;在等待水开的时间里,洗茶壶、洗茶杯、拿茶叶;等水开了,泡茶喝。
办法乙:先做好一些准备工作,洗水壶,灌水烧水;坐待水开了,洗茶壶茶杯,拿茶叶;一切就绪,泡茶喝。
2.1 代码架构
2.2 首先要保证redis是可以正常使用的,参考斑马斑马-05-白云之上-Redis初识
from redis import StrictRedis from django_redis import get_redis_connection sr = StrictRedis("39.99.213.203", port=6379, db=1) def set_redis(): res = sr.set("name", "zhangsan") print(res) def get_redis(): res = sr.get("name") print(res) if __name__ == '__main__': # 设置值 set_redis() # 获取值 get_redis()
2.3 设置url和views还有茶方法,至此计划B可以运行
from django.contrib import admin from django.urls import path from course.views import planA,planB urlpatterns = [ path('admin/', admin.site.urls), path('planA/',planA,name='planA'), path('planB/', planB, name='planB') ]
from django.shortcuts import render from django.http import JsonResponse from course.tasks import CourseTask from datetime import datetime import time from course import 茶 # Create your views here. def planA(request): start_time = time.time() start_ = time.strftime("%H:%M:%S", time.localtime()) print('A计划开始', start_time) pc = 茶.泡茶() pc.洗水壶() # 1 CourseTask.apply_async(('洗茶壶','洗茶杯', '拿茶叶'), queue='work_queue') pc.烧水() # 15 end_time = time.time() end_ = time.strftime("%H:%M:%S", time.localtime()) spendtime = str((end_time - start_time)) print('B计划结束', end_time) dict_content = {} dict_content.setdefault("beginTime", start_) dict_content.setdefault("endTime", end_) dict_content.setdefault("spendTime", spendtime) return JsonResponse(dict_content) def planB(request): start_time = time.time() start_=time.strftime("%H:%M:%S", time.localtime()) print('B计划开始', start_time) pc = 茶.泡茶() pc.洗水壶() #1 pc.烧水() #15 pc.洗茶壶() #1 pc.洗茶杯() #1 pc.拿茶叶() #2 end_time = time.time() end_=time.strftime("%H:%M:%S", time.localtime()) spendtime = str((end_time - start_time)) print('B计划结束', end_time) dict_content={} dict_content.setdefault("beginTime",start_) dict_content.setdefault("endTime",end_) dict_content.setdefault("spendTime",spendtime) return JsonResponse(dict_content)
import time class 泡茶(): def 洗水壶(self): time.sleep(1) def 烧水(self): time.sleep(15) def 洗茶壶(self): print("洗茶壶") time.sleep(1) def 洗茶杯(self): print("洗茶杯") time.sleep(1) def 拿茶叶(self): print("拿茶叶") time.sleep(2)
2.4 创建任务和配置文件
# 创建任务 import time from celery.task import Task from course import 茶 class CourseTask(Task): name = 'course-task' def run(self, *args, **kwargs): pc = 茶.泡茶() actions = list(args) print(actions) for action in actions: if action == "洗茶壶": pc.洗茶壶() continue if action == "洗茶杯": pc.洗茶杯() continue if action == "拿茶叶": pc.拿茶叶() continue
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
# 'course.apps.App01Config',
'course',
'djcelery',
]
from .celeryconfig import *
#定义中间件:使用的对象
BROKER_BACKEND='redis'
#定义中间件:数据库地址
BROKER_URL='redis://39.99.213.203:6379/1'
#定义中间件:结果存储地址
CELERY_RESULT_BACKEND='redis://39.99.213.203:6379/2'
""" Django settings for celeryDemo project. Generated by 'django-admin startproject' using Django 3.0.4. For more information on this file, see https://docs.djangoproject.com/en/3.0/topics/settings/ For the full list of settings and their values, see https://docs.djangoproject.com/en/3.0/ref/settings/ """ import os # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Quick-start development settings - unsuitable for production # See https://docs.djangoproject.com/en/3.0/howto/deployment/checklist/ # SECURITY WARNING: keep the secret key used in production secret! SECRET_KEY = '^ry*-g$fr)j1vz-^w+zw2wz@7$^&h_3)f3_r_swyenqb+pfum_' # SECURITY WARNING: don't run with debug turned on in production! DEBUG = True ALLOWED_HOSTS = [] # Application definition INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', # 'course.apps.App01Config', 'course', 'djcelery', ] from .celeryconfig import * #定义中间件:使用的对象 BROKER_BACKEND='redis' #定义中间件:数据库地址 BROKER_URL='redis://39.99.213.203:6379/1' #定义中间件:结果存储地址 CELERY_RESULT_BACKEND='redis://39.99.213.203:6379/2' MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', ] ROOT_URLCONF = 'celeryDemo.urls' TEMPLATES = [ { 'BACKEND': 'django.template.backends.django.DjangoTemplates', 'DIRS': [os.path.join(BASE_DIR, 'templates')] , 'APP_DIRS': True, 'OPTIONS': { 'context_processors': [ 'django.template.context_processors.debug', 'django.template.context_processors.request', 'django.contrib.auth.context_processors.auth', 'django.contrib.messages.context_processors.messages', ], }, }, ] WSGI_APPLICATION = 'celeryDemo.wsgi.application' # Database # https://docs.djangoproject.com/en/3.0/ref/settings/#databases DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, 'db.sqlite3'), } } # Password validation # https://docs.djangoproject.com/en/3.0/ref/settings/#auth-password-validators AUTH_PASSWORD_VALIDATORS = [ { 'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', }, { 'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', }, { 'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', }, { 'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', }, ] # Internationalization # https://docs.djangoproject.com/en/3.0/topics/i18n/ LANGUAGE_CODE = 'en-us' TIME_ZONE = 'UTC' USE_I18N = True USE_L10N = True USE_TZ = True # Static files (CSS, JavaScript, Images) # https://docs.djangoproject.com/en/3.0/howto/static-files/ STATIC_URL = '/static/'
import djcelery from datetime import timedelta djcelery.setup_loader() CELERY_QUEUES = { 'beat_tasks': { 'exchange': 'beat_tasks', 'exchange_type': 'direct', 'binding_key': 'beat_tasks' }, 'work_queue': { 'exchange': 'work_queue', 'exchange_type': 'direct', 'binding_key': 'work_queue' } } CELERY_DEFAULT_QUEUE = 'work_queue' CELERY_IMPORTS=( 'course.tasks' ) # 有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # 设置并发的worker数量 CELERYD_CONCURRENCY = 4 # 允许重试 CELERY_ACKS_LATE = True # 每个worker最多执行100个任务被销毁,可以防止内存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 100 # 单个任务的最大运行时间,超过就杀死 CELERYD_TASK_TIME_LEMIT = 12 * 30 # 定时任务 CELERYBEAT_SCHEDULE = { 'task1': { 'task': 'course-task', 'schedule': timedelta(seconds=5), # 每5秒执行一次 'options': { 'queue': 'beat_tasks' # 当前定时任务是跑在beat_tasks队列上的 } } }
3:测试
python manage.py runserver
python manage.py celery worker -l info (工人)启动worker节点来处理任务
python manage.py celery beat -l info (领导)启动定时任务,当到时间时,把任务放入broker中,broker检测到了,让worker去工作。
python manage.py celery flower 错误日志监控,启动时必须先启动worker
三、Hello,Celery 2
1:文件读写的demo
url中添加:path('add_test/', add_test, name='add_test'),
def add_test(request): from course.tasks import add from django.http import HttpResponse ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)#如果定了CELERY_RESULT_BACKEND 此时会把结果插入到redis数据库中 return HttpResponse('ok')
@task def add(a, b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a + b)
以下是完整内容
"""celeryDemo URL Configuration The `urlpatterns` list routes URLs to views. For more information please see: https://docs.djangoproject.com/en/3.0/topics/http/urls/ Examples: Function views 1. Add an import: from my_app import views 2. Add a URL to urlpatterns: path('', views.home, name='home') Class-based views 1. Add an import: from other_app.views import Home 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home') Including another URLconf 1. Import the include() function: from django.urls import include, path 2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) """ from django.contrib import admin from django.urls import path from course.views import planA,planB,add_test urlpatterns = [ path('admin/', admin.site.urls), path('planA/',planA,name='planA'), path('planB/', planB, name='planB'), path('add_test/', add_test, name='add_test'), ]
# 创建任务 import time from celery import task from celery.task import Task from course import 茶 class CourseTask(Task): name = 'course-task' def run(self, *args, **kwargs): pc = 茶.泡茶() actions = list(args) print(actions) for action in actions: if action == "洗茶壶": pc.洗茶壶() continue if action == "洗茶杯": pc.洗茶杯() continue if action == "拿茶叶": pc.拿茶叶() continue @task def add(a, b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a + b)
from django.shortcuts import render from django.http import JsonResponse from course.tasks import CourseTask from datetime import datetime import time from course import 茶 # Create your views here. def planA(request): start_time = time.time() start_ = time.strftime("%H:%M:%S", time.localtime()) print('A计划开始', start_time) pc = 茶.泡茶() pc.洗水壶() # 1 CourseTask.apply_async(('洗茶壶', '洗茶杯', '拿茶叶'), queue='work_queue') pc.烧水() # 15 end_time = time.time() end_ = time.strftime("%H:%M:%S", time.localtime()) spendtime = str((end_time - start_time)) print('B计划结束', end_time) dict_content = {} dict_content.setdefault("beginTime", start_) dict_content.setdefault("endTime", end_) dict_content.setdefault("spendTime", spendtime) return JsonResponse(dict_content) def planB(request): start_time = time.time() start_ = time.strftime("%H:%M:%S", time.localtime()) print('B计划开始', start_time) pc = 茶.泡茶() pc.洗水壶() # 1 pc.烧水() # 15 pc.洗茶壶() # 1 pc.洗茶杯() # 1 pc.拿茶叶() # 2 end_time = time.time() end_ = time.strftime("%H:%M:%S", time.localtime()) spendtime = str((end_time - start_time)) print('B计划结束', end_time) dict_content = {} dict_content.setdefault("beginTime", start_) dict_content.setdefault("endTime", end_) dict_content.setdefault("spendTime", spendtime) return JsonResponse(dict_content) def add_test(request): from course.tasks import add from django.http import HttpResponse ctime = datetime.now() # 默认用utc时间 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)#如果定了CELERY_RESULT_BACKEND 此时会把结果插入到redis数据库中 return HttpResponse('ok')
2:测试
五、Web监控管理服务-Flower
1:简介
Flower是Celery的一个实时监控和管理Web界面工具,目前仍在活跃的开发之中,但已经是一个很重要的可用工具了
2:使用
把大象装冰箱分三步,第一步:打开冰箱门,第二步:把大象放进去,第三步:关上冰箱门。
使用Flower也分三步,第一步:安装,第二步:启动,第三步:访问。
2.1 安装:使用pip安装Flower:
pip3 install flower -i https://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn
2.2 运行 flower命令启动web-server:
python manage.py celery flower
2.3 web页面访问:
127.0.0.1:5555/dashboard
六、日志打印
1:日志设置
from celery.utils.log import get_task_logger
logger=get_task_logger(__name__)
logger.debug(actions)
# 创建任务 import time from celery import task from celery.task import Task from course import 茶 from celery.utils.log import get_task_logger logger=get_task_logger(__name__) class CourseTask(Task): name = 'course-task' def run(self, *args, **kwargs): pc = 茶.泡茶() actions = list(args) logger.debug(actions) for action in actions: if action == "洗茶壶": pc.洗茶壶() continue if action == "洗茶杯": pc.洗茶杯() continue if action == "拿茶叶": pc.拿茶叶() continue @task def add(a, b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a + b)
2:启动
python manage.py celery worker -l debug -f E:StudycodePythonceleryDemoaaa.log
3:查看日志