import time def add(x, y): print("进入函数") time.sleep(5) return x + y if __name__ == '__main__': print("开始线程") result = add(100,200) print("结束线程") print(result) # 结果 开始线程 进入函数 结束线程 300
中间需要等待五秒钟,然后执行打印,如果是非异步的话,程序走到这里都要等待,非常的不合理!
然后使用celery进行改造
celery_study目录下创建,app.py、task.py和__init__.py
# task.py import time from celery import Celery app = Celery("my_task",broker="redis://127.0.0.1:6379/3",backend="redis://127.0.0.1:6379/4") @app.task def add(x, y): print("进入函数") time.sleep(5) return x + y
# app.py from task import add if __name__ == '__main__': print("开始程序") result = add.delay(10,20) print("结束程序") print(result)
执行app.py打印结果如下,而且中间没有等待5秒中,瞬间执行完成!
开始程序
结束程序
7cc62995-84df-45ce-8067-5f321e4443e7
054cf21c-9e0d-48ef-be4c-a27f4292b56d 这一串东西是什么呢?不是应该打印add函数的返回值30么?
这个是因为刚才执行了,app.py,遇到delay()函数,将我们的任务发布出去了,发布到哪里呢,就是broker里面了,我们打开redis的3号仓库,发现执行完,app.py后多了两个key,"celery"和"_kombu.binding.celery",通过查看celery这个列表发现,"correlation_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d",关联id,也就是说我们把任务发到了里面,用这个id标记。
127.0.0.1:6379[3]> keys * 1) "celery" 2) "_kombu.binding.celery" 3) "history_2" 127.0.0.1:6379[3]> type "_kombu.binding.celery" set 127.0.0.1:6379[3]> smembers "_kombu.binding.celery" 1) "celeryx06x16x06x16celery" 127.0.0.1:6379[3]> type "celery" list 127.0.0.1:6379[3]> llen "celery" (integer) 4 127.0.0.1:6379[3]> lrange "celery" 0 4 1) "{"content-type": "application/json", "body": "W1sxMCwgMjBdLCB7fSwgeyJlcnJiYWNrcyI6IG51bGwsICJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=", "headers": {"eta": null, "shadow": null, "task": "task.add", "kwargsrepr": "{}", "group": null, "parent_id": null, "root_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "lang": "py", "expires": null, "timelimit": [null, null], "id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "origin": "gen17207@ubuntu", "argsrepr": "(10, 20)", "retries": 0}, "content-encoding": "utf-8", "properties": {"reply_to": "7ac5687b-8340-3b88-bfcb-10159171d35f", "priority": 0, "delivery_tag": "c36e6d83-12d6-4532-8722-2ad881b1dfba", "delivery_info": {"routing_key": "celery", "exchange": ""}, "correlation_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "delivery_mode": 2, "body_encoding": "base64"}}" 2) "{"headers": {"retries": 0, "group": null, "expires": null, "argsrepr": "(10, 20)", "id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "kwargsrepr": "{}", "timelimit": [null, null], "lang": "py", "root_id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "shadow": null, "parent_id": null, "eta": null, "task": "task.add", "origin": "gen17195@ubuntu"}, "properties": {"delivery_tag": "48d8ba98-a961-425f-992a-444b339779e8", "delivery_info": {"exchange": "", "routing_key": "celery"}, "reply_to": "380360ff-eb4a-3afd-94c2-2590a3e825a4", "priority": 0, "delivery_mode": 2, "correlation_id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "body_encoding": "base64"}, "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsfV0=", "content-type": "application/json", "content-encoding": "utf-8"}" 3) "{"properties": {"priority": 0, "correlation_id": "7cc62995-84df-45ce-8067-5f321e4443e7", "reply_to": "f15a7899-947d-3c2b-9c18-aa267588315c", "body_encoding": "base64", "delivery_mode": 2, "delivery_info": {"routing_key": "celery", "exchange": ""}, "delivery_tag": "52649bd9-7bc3-4587-8ca4-25035b04bc6a"}, "content-encoding": "utf-8", "headers": {"kwargsrepr": "{}", "eta": null, "task": "task.add", "parent_id": null, "timelimit": [null, null], "lang": "py", "origin": "gen17053@ubuntu", "id": "7cc62995-84df-45ce-8067-5f321e4443e7", "group": null, "expires": null, "argsrepr": "(10, 20)", "shadow": null, "retries": 0, "root_id": "7cc62995-84df-45ce-8067-5f321e4443e7"}, "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=", "content-type": "application/json"}" 4) "{"properties": {"priority": 0, "body_encoding": "base64", "reply_to": "2935bf4c-0d9c-3603-b8d2-e70735f5e378", "delivery_info": {"routing_key": "celery", "exchange": ""}, "delivery_mode": 2, "correlation_id": "d80331aa-4a7c-4921-8f29-de43c933737a", "delivery_tag": "002c77d7-f224-422d-ac1b-21997433a274"}, "headers": {"expires": null, "parent_id": null, "origin": "gen17023@ubuntu", "root_id": "d80331aa-4a7c-4921-8f29-de43c933737a", "lang": "py", "kwargsrepr": "{}", "task": "task.add", "argsrepr": "(10, 20)", "timelimit": [null, null], "id": "d80331aa-4a7c-4921-8f29-de43c933737a", "shadow": null, "eta": null, "group": null, "retries": 0}, "content-type": "application/json", "content-encoding": "utf-8", "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0="}"
现在只是消息中间件里面存储了,并没有人来消费,具体通过谁消费呢?worker来消费,celery worker -A task -l INFO
-------------- celery@ubuntu v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.4.0-31-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-18 20:45:52 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: my_task:0x7fe31dd6b550 - ** ---------- .> transport: redis://127.0.0.1:6379/3 - ** ---------- .> results: redis://127.0.0.1:6379/4 - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . task.add [2020-03-18 20:45:52,929: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3 [2020-03-18 20:45:52,941: INFO/MainProcess] mingle: searching for neighbors [2020-03-18 20:45:53,968: INFO/MainProcess] mingle: all alone [2020-03-18 20:45:53,996: INFO/MainProcess] celery@ubuntu ready. [2020-03-18 20:45:54,159: INFO/MainProcess] Received task: task.add[d80331aa-4a7c-4921-8f29-de43c933737a] [2020-03-18 20:45:54,164: INFO/MainProcess] Received task: task.add[7cc62995-84df-45ce-8067-5f321e4443e7] [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-7] 进入函数 [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-5] 进入函数 [2020-03-18 20:45:54,174: INFO/MainProcess] Received task: task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] [2020-03-18 20:45:54,179: INFO/MainProcess] Received task: task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] [2020-03-18 20:45:54,178: WARNING/ForkPoolWorker-1] 进入函数 [2020-03-18 20:45:54,185: WARNING/ForkPoolWorker-2] 进入函数 [2020-03-18 20:45:59,190: INFO/ForkPoolWorker-7] Task task.add[7cc62995-84df-45ce-8067-5f321e4443e7] succeeded in 5.022220275001018s: 30 [2020-03-18 20:45:59,195: INFO/ForkPoolWorker-1] Task task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] succeeded in 5.0176936099960585s: 30 [2020-03-18 20:45:59,197: INFO/ForkPoolWorker-5] Task task.add[d80331aa-4a7c-4921-8f29-de43c933737a] succeeded in 5.028447696997318s: 30 [2020-03-18 20:45:59,200: INFO/ForkPoolWorker-2] Task task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] succeeded in 5.015312493997044s: 30
我一共执行了四次app.py,Received了四次。所以会消费四次,消费谁,消费被装饰的函数,add()四次,也就是执行四次,所以会有四个进入函数的打印,四个30的返回值!
然后我们再来看看,backend里面存储的是什么?
127.0.0.1:6379[3]> select 4 OK 127.0.0.1:6379[4]> keys * 1) "cart_selected_2" 2) "celery-task-meta-8744e933-05a4-4acc-a82a-77f4c371edcb" 3) "celery-task-meta-d80331aa-4a7c-4921-8f29-de43c933737a" 4) "celery-task-meta-7cc62995-84df-45ce-8067-5f321e4443e7" 5) "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d" 6) "cart_2" 127.0.0.1:6379[4]> type "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d" string 127.0.0.1:6379[4]> get "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d" "{"task_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "traceback": null, "children": [], "date_done": "2020-03-18T12:45:59.188311", "result": 30, "status": "SUCCESS"}" 127.0.0.1:6379[4]>
worker消费的返回值,存在了每一个任务的 "result": 30,里面,因此我们可以知道,backend是存储消费结果,broker是存储等待消费的信号
此时我们在python3 app.py,发现
[2020-03-18 20:45:52,929: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3 [2020-03-18 20:45:52,941: INFO/MainProcess] mingle: searching for neighbors [2020-03-18 20:45:53,968: INFO/MainProcess] mingle: all alone [2020-03-18 20:45:53,996: INFO/MainProcess] celery@ubuntu ready. [2020-03-18 20:45:54,159: INFO/MainProcess] Received task: task.add[d80331aa-4a7c-4921-8f29-de43c933737a] [2020-03-18 20:45:54,164: INFO/MainProcess] Received task: task.add[7cc62995-84df-45ce-8067-5f321e4443e7] [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-7] 进入函数 [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-5] 进入函数 [2020-03-18 20:45:54,174: INFO/MainProcess] Received task: task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] [2020-03-18 20:45:54,179: INFO/MainProcess] Received task: task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] [2020-03-18 20:45:54,178: WARNING/ForkPoolWorker-1] 进入函数 [2020-03-18 20:45:54,185: WARNING/ForkPoolWorker-2] 进入函数 [2020-03-18 20:45:59,190: INFO/ForkPoolWorker-7] Task task.add[7cc62995-84df-45ce-8067-5f321e4443e7] succeeded in 5.022220275001018s: 30 [2020-03-18 20:45:59,195: INFO/ForkPoolWorker-1] Task task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] succeeded in 5.0176936099960585s: 30 [2020-03-18 20:45:59,197: INFO/ForkPoolWorker-5] Task task.add[d80331aa-4a7c-4921-8f29-de43c933737a] succeeded in 5.028447696997318s: 30 [2020-03-18 20:45:59,200: INFO/ForkPoolWorker-2] Task task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] succeeded in 5.015312493997044s: 30 [2020-03-18 21:03:16,842: INFO/MainProcess] Received task: task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae] [2020-03-18 21:03:16,845: WARNING/ForkPoolWorker-6] 进入函数 [2020-03-18 21:03:21,867: INFO/ForkPoolWorker-6] Task task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae] succeeded in 5.022131994999654s: 30
# __init__.py from celery import Celery app = Celery("demo") # 通过celery实例加载配置模块 app.config_from_object("celery_app.celery_config")
# celery_config.py BROKER_URL = "redis://127.0.0.1:6379/3" BACKEND_RESULT_URL = "redis://127.0.0.1:6379/4" CELERY_TIMEZONE = "Asia/Shanghai" # 导入指定的任务模块 CELERY_IMPORTS = ( "celery_app.task1", "celery_app.task2" )
# task1.py import time from celery_app import app @app.task def add(x, y): print("进入函数") time.sleep(5) return x + y
# task2.py import time from celery_app import app @app.task def multiply(x, y): print("进入函数") time.sleep(5) return x * y
# app.py from celery_app import task1 from celery_app import task2 if __name__ == '__main__': task1.add.delay(250,250) task2.multiply.delay(100,100) print("end")
celery worker -A celery_app -l INFO
-------------- celery@ubuntu v4.3.0 (rhubarb) ---- **** ----- --- * *** * -- Linux-4.4.0-31-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-18 21:33:48 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: demo:0x7f90118e4550 - ** ---------- .> transport: redis://127.0.0.1:6379/3 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . celery_app.task1.add . celery_app.task2.multiply [2020-03-18 21:33:48,303: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3 [2020-03-18 21:33:48,316: INFO/MainProcess] mingle: searching for neighbors [2020-03-18 21:33:49,343: INFO/MainProcess] mingle: all alone [2020-03-18 21:33:49,369: INFO/MainProcess] celery@ubuntu ready.
[2020-03-18 21:41:51,382: INFO/MainProcess] Received task: celery_app.task1.add[192b3015-af33-4153-ab09-3febdd99ebfd] [2020-03-18 21:41:51,388: WARNING/ForkPoolWorker-3] 进入函数 [2020-03-18 21:41:56,393: INFO/ForkPoolWorker-3] Task celery_app.task1.add[192b3015-af33-4153-ab09-3febdd99ebfd] succeeded in 5.004680051999458s: 500 [2020-03-18 21:43:31,109: INFO/MainProcess] Received task: celery_app.task1.add[d8f89851-5b27-4518-ac73-2bc43495fa1e] [2020-03-18 21:43:31,111: WARNING/ForkPoolWorker-5] 进入函数 [2020-03-18 21:43:36,116: INFO/ForkPoolWorker-5] Task celery_app.task1.add[d8f89851-5b27-4518-ac73-2bc43495fa1e] succeeded in 5.004891623000731s: 500 [2020-03-18 21:45:20,340: INFO/MainProcess] Received task: celery_app.task1.add[ad0cdd51-e937-4f9b-bc7d-a8ece8047705] [2020-03-18 21:45:20,341: WARNING/ForkPoolWorker-7] 进入函数 [2020-03-18 21:45:25,594: INFO/ForkPoolWorker-7] Task celery_app.task1.add[ad0cdd51-e937-4f9b-bc7d-a8ece8047705] succeeded in 5.252825808995112s: 500 [2020-03-18 21:46:44,956: INFO/MainProcess] Received task: celery_app.task1.add[55fcb2c2-4242-43dd-9848-6352039965f7] [2020-03-18 21:46:44,957: WARNING/ForkPoolWorker-1] 进入函数 [2020-03-18 21:46:44,959: INFO/MainProcess] Received task: celery_app.task2.multiply[2827587c-1f13-4099-be15-bb469ff86a27] [2020-03-18 21:46:44,961: WARNING/ForkPoolWorker-3] 进入函数 [2020-03-18 21:46:49,963: INFO/ForkPoolWorker-3] Task celery_app.task2.multiply[2827587c-1f13-4099-be15-bb469ff86a27] succeeded in 5.002474594999512s: 10000 [2020-03-18 21:46:49,963: INFO/ForkPoolWorker-1] Task celery_app.task1.add[55fcb2c2-4242-43dd-9848-6352039965f7] succeeded in 5.0065162960017915s: 500
task2执行一次是因为,task2.multiply.apply_asny(100,100)换成 task2.multiply.delay,解决,前三次,执行报错multiply() argument after ** must be a mapping, not int,说明需要传字典参数
from celery import Celery from datetime import timedelta broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('mytask', broker=broker, backend=backend) app.conf.CELERY_TIMEZONE = 'Asia/Shanghai' app.conf.CELERY_ENABLE_UTC = True @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): sender.add_periodic_task(timedelta(seconds=10),getRuminate.s('start')) @app.task def getRuminate(arg): print("hehe") if arg == "start": print(arg) demo() def demo(): print("定时执行任务") return "定时执行任务"
# mytask.py的同级目录下执行,先启动定时调度任务
[root@VM_0_13_centos celery_demo]# celery beat -A mytask -l INFO
celery beat v4.4.2 (cliffs) is starting. __ - ... __ - _ LocalTime -> 2020-03-23 21:46:33 Configuration -> . broker -> redis://127.0.0.1:6379/1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2020-03-23 21:46:33,006: INFO/MainProcess] beat: Starting... [2020-03-23 21:46:33,061: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate) [2020-03-23 21:46:43,043: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate) [2020-03-23 21:46:53,043: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate) [2020-03-23 21:47:03,044: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate) [2020-03-23 21:47:13,044: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)
每十秒发布一个任务,然后启动消费任务
celery worker -A mytask -l INFO
[root@VM_0_13_centos celery_demo]# celery worker -A mytask -l INFO /usr/local/python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@VM_0_13_centos v4.4.2 (cliffs) --- ***** ----- -- ******* ---- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2020-03-23 21:47:08 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: mytask:0x7fc80fb6e588 - ** ---------- .> transport: redis://127.0.0.1:6379/1 - ** ---------- .> results: redis://127.0.0.1:6379/2 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . mytask.getRuminate [2020-03-23 21:47:08,139: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1 [2020-03-23 21:47:08,146: INFO/MainProcess] mingle: searching for neighbors [2020-03-23 21:47:09,161: INFO/MainProcess] mingle: all alone [2020-03-23 21:47:09,170: INFO/MainProcess] celery@VM_0_13_centos ready. [2020-03-23 21:47:09,209: INFO/MainProcess] Received task: mytask.getRuminate[851b5a5b-d978-4274-b1c5-32a09b827bb3] [2020-03-23 21:47:09,210: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:09,211: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:09,215: INFO/ForkPoolWorker-1] Task mytask.getRuminate[851b5a5b-d978-4274-b1c5-32a09b827bb3] succeeded in 0.00445167699990634s: None [2020-03-23 21:47:09,215: INFO/MainProcess] Received task: mytask.getRuminate[bdbecb99-0771-4912-bc2f-b2eb26a8c99b] [2020-03-23 21:47:09,217: INFO/MainProcess] Received task: mytask.getRuminate[73887936-80d3-4ab4-86eb-020a90541e52] [2020-03-23 21:47:09,217: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:09,217: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:09,218: INFO/ForkPoolWorker-1] Task mytask.getRuminate[bdbecb99-0771-4912-bc2f-b2eb26a8c99b] succeeded in 0.0005744890002006287s: None [2020-03-23 21:47:09,219: INFO/MainProcess] Received task: mytask.getRuminate[3934bc2a-8a2a-4d5d-a22f-afeef0b4924c] [2020-03-23 21:47:09,220: INFO/MainProcess] Received task: mytask.getRuminate[0b945bd6-2ca3-4eca-8113-505620cdcaae] [2020-03-23 21:47:09,221: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:09,221: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:09,221: INFO/ForkPoolWorker-1] Task mytask.getRuminate[73887936-80d3-4ab4-86eb-020a90541e52] succeeded in 0.0005343299999367446s: None [2020-03-23 21:47:09,222: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:09,222: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:09,223: INFO/ForkPoolWorker-1] Task mytask.getRuminate[3934bc2a-8a2a-4d5d-a22f-afeef0b4924c] succeeded in 0.00047098000004552887s: None [2020-03-23 21:47:09,224: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:09,224: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:09,224: INFO/ForkPoolWorker-1] Task mytask.getRuminate[0b945bd6-2ca3-4eca-8113-505620cdcaae] succeeded in 0.0005017210000914929s: None [2020-03-23 21:47:13,046: INFO/MainProcess] Received task: mytask.getRuminate[346bb012-fa87-4248-94a6-a623aa6f079d] [2020-03-23 21:47:13,047: WARNING/ForkPoolWorker-1] start [2020-03-23 21:47:13,047: WARNING/ForkPoolWorker-1] 定时执行任务 [2020-03-23 21:47:13,048: INFO/ForkPoolWorker-1] Task mytask.getRuminate[346bb012-fa87-4248-94a6-a623aa6f079d] succeeded in 0.0007134340000902739s: None
我们继续做一个demo,每十秒给文件写入一个时间戳
# mytask.py
from datetime import timedelta import datetime broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('mytask', broker=broker, backend=backend) app.conf.CELERY_TIMEZONE = 'Asia/Shanghai' app.conf.CELERY_ENABLE_UTC = True @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs):
# 传参是用元祖传递 sender.add_periodic_task(timedelta(seconds=10),getRuminate.s(1,2)) @app.task def getRuminate(a,b): print(a,b) demo(a,b) def demo(a,b): with open("/workspace/celery_demo.txt","a") as f: f.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+str(a)+str(b))
2020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:33122020-03-23 22:19:43122020-03-23 22:19:5312
~
前面几个2020-03-23 22:19:3012 是因为任务发布后,没有及时启动worker消费,开启消费后,瞬间完成,所以时间一样,后面重启beat后,立马启动worker,时间间隔刚好是10s钟
import requests from celery import Celery from datetime import timedelta import datetime from lxml import etree from dateutil.relativedelta import relativedelta from celery.schedules import crontab import os import re import configparser from celery_demo.logs import mylogger from celery_demo.db import mongodbwrapper broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('mytask', broker=broker, backend=backend) app.conf.CELERY_TIMEZONE = 'Asia/Shanghai' app.conf.CELERY_ENABLE_UTC = True root_path = os.getcwd() + "/celery_demo" cf = configparser.ConfigParser() cf.read(root_path + "/config/config.conf") logger = mylogger.MyLogger(root_path + cf.get("logs", "LOG_PATH"), cf.get("logs", "LOG_NAME")).getlogger() monger_wrapper = mongodbwrapper.MongoWrapper(cf, logger) logger.info(monger_wrapper) @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): logger.info("进入了调度函数") sender.add_periodic_task(timedelta(seconds=50), # 这里的时间后续会改成 crontab格式的,每天凌晨1点爬取数据 spider.s("start")) @app.task def spider(arg): if arg == "start": print("spider is running") logger.info("spider is running") parse_detail() def parse_html(url): headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"} try: response = "" response = requests.get(url=url,headers=headers) if response.status_code != 200: logger.error("parse_html" + str(response.status_code)) response = response.content html_str = response.decode("gb2312") e = etree.HTML(html_str) div_list = e.xpath(".//div[@id='main']/div[@id='original']/div[@id='news']/div[@class='NewRight_01']/div[@class='Rul_01']") if len(div_list) == 0: return ({"errmsg":"获取数据失败","status":0}) li_list = div_list[0].xpath(".//li//a") if len(li_list) == 0: return ({"errmsg": "获取数据失败","status":0}) temp_list = list() for li in li_list: temp_dict = dict() title_list = li.xpath(".//@title") if len(title_list) == 0: return ({"errmsg": "获取数据失败", "status": 0}) href_list = li.xpath(".//@href") if len(href_list) == 0: return ({"errmsg": "获取数据失败", "status": 0}) now = datetime.datetime.now() yesterday = now - relativedelta(days=1) yesterday_str = str(yesterday.year) + "年" + str(yesterday.month) + "月" + str(yesterday.day) + "日" if yesterday_str in title_list[0] and ("羊" in title_list[0] or "牛" in title_list[0] or "猪" in title_list[0]): temp_dict["title"] = title_list[0] temp_dict["href"] = href_list[0] temp_list.append(temp_dict) logger.info(temp_list) return {"msg":temp_list,"status":1} except Exception as e: logger.error("parse_html" + str(e)) def parse_detail(): try: logger.info("即将进入解析函数") url = "xxx" info = parse_html(url) if "errmsg" in info.keys(): return ({"errmsg": "获取数据失败", "status": 0}) info_list = info.get("msg") temp_list= list() for info in info_list: temp_dict = dict() title = info.get("title") if "羊肉" in title: title = "羊肉行情" elif "牛肉" in title: title = "牛肉行情" elif "猪肉" in title: title = "猪肉行情" href = info.get("href") href = href.split("/") detail_url = url + href[0] headers = { "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"} response = requests.get(url=detail_url, headers=headers).content html_str = response.decode("gb2312",errors="ignore") e = etree.HTML(html_str) div_list = e.xpath(".//div[@class='newDetail_new']") if len(div_list) == 0: logger.info({"errmsg": "获取数据失败", "status": 0}) return "error" p_list = e.xpath(".//div[@class='newDetail_new']//p") if len(p_list) == 0: logger.info({"errmsg": "获取数据失败", "status": 0}) return "error" temp_data_list = list() for p in p_list: content = p.text if content: data_info_dict = dict() content = p.text content_list = content.split(" ") now = datetime.datetime.now() yesterday = now - relativedelta(days=1) yesterday_str = str(yesterday.year) + "年" + str(yesterday.month) + "月" + str(yesterday.day) + "日" if yesterday_str in content_list: price_str = content_list[3] result = re.findall(r"d+.d|d+", price_str) data_info_dict["price"] = float(result[0]) now = datetime.datetime.now() yesterday = now - relativedelta(days=1) data_info_dict["time"] = yesterday data_info_dict["position"] = content_list[4] temp_data_list.append(data_info_dict) temp_dict["title"] = title temp_dict["data_list"] = temp_data_list temp_list.append(temp_dict) save_data(temp_list) except Exception as e: logger.error(str(e)) def save_data(data_dict): for data in data_dict: animal_type = "" collection = "" if data.get("title") == "羊肉行情": collection = "sheep_price" animal_type = "goat" elif data.get("title") == "牛肉行情": animal_type = "cow" collection = "cow_price" elif data.get("title") == "猪肉行情": collection = "pig_price" animal_type = "pig" insert_dict = dict() now = datetime.datetime.now() yesterday = now - relativedelta(days=1) insert_dict["time"] = yesterday insert_dict["data"] = data.get("data_list") insert_dict["animal_type"] = animal_type tmp_res = monger_wrapper.insert_one(collection,insert_dict) result = str(tmp_res.inserted_id) logger.info("插入数据成功,数据_id为" + result)
需要注意的是,因为在mytask里面导包是从celery_demo的文件下导包,因此celery beat 和 celery worker 的执行路径要是 celery_demo的文件夹路径,这里放在 /workspace下面
[root@VM_0_13_centos workspace]# ls celerybeat-schedule celery_demo celery_demo.txt chinese.py [root@VM_0_13_centos workspace]# celery beat -A celery_demo.mytask -l INFO celery beat v4.4.2 (cliffs) is starting. __ - ... __ - _ LocalTime -> 2020-03-28 19:54:55 Configuration -> . broker -> redis://127.0.0.1:6379/1 . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%INFO . maxinterval -> 5.00 minutes (300s) [2020-03-28 19:54:55,705: INFO/MainProcess] beat: Starting... [2020-03-28 19:54:55,799: INFO/MainProcess] Scheduler: Sending due task celery_demo.mytask.spider('start') (celery_demo.mytask.spider)