前戏
已经安装好了celery,redis模块,还安装好了redis服务。
新建两个py文件,一个为task,一个为demo,内容如下。
task.py
import time def test(name): time.sleep(5) print(f'hello {name}') return f'{name}'
demo.py
from task import test if __name__ == '__main__': test('jack')
这两个文件内容相信大家都能看懂(看不懂也没关系)
执行demo.py会输出hello jack
使用celery实现异步
声明:本人的redis服务和代码都在远程服务器上,本地的pycharm也上连接的远程服务器
修改task.py
import time from celery import Celery # 实例化Celery app = Celery('celery_test', broker='redis://:redis666@127.0.0.1:6379', backend='redis://:redis666@127.0.0.1:6379') # redis666为redis的密码,前面要加: 后面要加@ @app.task def test(name): time.sleep(5) print(f'hello {name}') return 'wahaha'
其中的broker是存放缓存任务的。backend是存放缓存任务结果的。这里是存放在redis里。
修改demo.py
from task import test if __name__ == '__main__': # 将任务存放在broker里 test.delay('jack') # 这样调用
在服务器上启动celery worker,我的是在虚拟环境里安装的celery,所以要进入到虚拟环境在执行
celery -A task worker -l INFO
也可以加上-c参数 celery worker -A task -c 6 -l INFO
其中 -A task 表示只运行task里的任务。-c 启动的worker数量,这里为6。-l 日志级别,这里为INFO级别。
执行demo.py,查看celery的work
celery的配置文件抽取
目录结构如下
apps下的__init__.py内容
from celery import Celery app = Celery('test_task') app.config_from_object('apps.celery_conf')
celery_conf.py的内容
BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2' # 需要导入task1和task2 CELERY_IMPORTS = ( 'apps.task1', 'apps.task2' )
task1.py的内容
from apps import app @app.task def add(x,y): return x+y
task2.py的内容
from apps import app @app.task def subs(x,y): return x-y
demo.py的内容
from apps.task1 import add from apps.task2 import subs if __name__ == '__main__': add.delay(3,5) subs.delay(8,6)
启动celery worker
运行demo.py
celery的常用配置
上面我们创建里celery_conf.py文件,用来存放celery的配置。其他的配置我们也可以写在里面,常用的配置如下
BROKER_URL = 'redis://:redis666@127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://:redis666@127.0.0.1:6379/2' # 需要导入task1和task2 CELERY_IMPORTS = ( 'apps.task1', 'apps.task2' ) # 时区设置。默认UTC CELERY_TIMEZONE = 'Asia/Shanghai' # 是否使用本地的时区,False时将使用本地的时区 CELERY_ENABLE_UTC = False # 重写task的属性,限制tasks模块下的add函数,每秒钟只能执行10次 CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}} # 连接错误情况下是否重试发布任务消息,默认为True CELERY_TASK_PUBLISH_RETRY = False # 并发的worker数量,也是命令行-c指定的数目 # 事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了 CELERYD_CONCURRENCY = 20 # 每次worker去任务队列中取的任务数量 CELERY_PREFETH_MULTIPLIRE = 5 # 每个worker执行多少次被杀掉 CELERYD_MAX_TASKS_PER_CHILD = 200 # 单个任务的最大执行时间 CELERY_TASK_TIME_LIMIT = 60 # celery任务执行结果的超时时间 CELERY_TASK_RESULT_EXPIRES = 1000
wins下启动worker报错
如果你在wins下执行了 celery worker -A task -l INFO,运行demo.py后,celery报如下错误
这是因为3.x之后的celery不支持wins导致的。我们只需要在安装一个第三方库eventlet就可以了
pip install eventlet
然后我们启动worker就不能以上面的方式启动了,需要加个 -P 参数
celery worker -A task -l INFO -P eventlet
然后执行demo.py,worker就不会报错了