zoukankan      html  css  js  c++  java
  • Celery知识点

     
    一:celery
    作用:celery能后执行异步操作,可以去执行耗时的函数,也可以用来做定时调度
    例如:发短信,消息推送,音视频处理
    普通函数
    import time
    
    
    def add(x, y):
        print("进入函数")
        time.sleep(5)
        return x + y
    
    if __name__ == '__main__':
        print("开始线程")
        result = add(100,200)
        print("结束线程")
        print(result)
    
    
    # 结果
    开始线程
    进入函数
    结束线程
    300

    中间需要等待五秒钟,然后执行打印,如果是非异步的话,程序走到这里都要等待,非常的不合理!

    然后使用celery进行改造

    celery_study目录下创建,app.py、task.py和__init__.py

    # task.py
    
    import time
    from celery import Celery
    
    app = Celery("my_task",broker="redis://127.0.0.1:6379/3",backend="redis://127.0.0.1:6379/4")
    
    
    @app.task
    def add(x, y):
        print("进入函数")
        time.sleep(5)
        return x + y
    # app.py
    from task import add
    
    if __name__ == '__main__':
        print("开始程序")
        result = add.delay(10,20)
        print("结束程序")
        print(result)

    执行app.py打印结果如下,而且中间没有等待5秒中,瞬间执行完成!

    开始程序
    结束程序
    7cc62995-84df-45ce-8067-5f321e4443e7

    054cf21c-9e0d-48ef-be4c-a27f4292b56d 这一串东西是什么呢?不是应该打印add函数的返回值30么?

    这个是因为刚才执行了,app.py,遇到delay()函数,将我们的任务发布出去了,发布到哪里呢,就是broker里面了,我们打开redis的3号仓库,发现执行完,app.py后多了两个key,"celery"和"_kombu.binding.celery",通过查看celery这个列表发现,"correlation_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d",关联id,也就是说我们把任务发到了里面,用这个id标记。

    127.0.0.1:6379[3]> keys * 
    1) "celery"
    2) "_kombu.binding.celery"
    3) "history_2"
    127.0.0.1:6379[3]> type "_kombu.binding.celery"
    set
    127.0.0.1:6379[3]> smembers "_kombu.binding.celery"
    1) "celeryx06x16x06x16celery"
    127.0.0.1:6379[3]> type "celery"
    list
    127.0.0.1:6379[3]> llen "celery"
    (integer) 4
    127.0.0.1:6379[3]> lrange "celery" 0 4
    1) "{"content-type": "application/json", "body": "W1sxMCwgMjBdLCB7fSwgeyJlcnJiYWNrcyI6IG51bGwsICJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=", "headers": {"eta": null, "shadow": null, "task": "task.add", "kwargsrepr": "{}", "group": null, "parent_id": null, "root_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "lang": "py", "expires": null, "timelimit": [null, null], "id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "origin": "gen17207@ubuntu", "argsrepr": "(10, 20)", "retries": 0}, "content-encoding": "utf-8", "properties": {"reply_to": "7ac5687b-8340-3b88-bfcb-10159171d35f", "priority": 0, "delivery_tag": "c36e6d83-12d6-4532-8722-2ad881b1dfba", "delivery_info": {"routing_key": "celery", "exchange": ""}, "correlation_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "delivery_mode": 2, "body_encoding": "base64"}}"
    2) "{"headers": {"retries": 0, "group": null, "expires": null, "argsrepr": "(10, 20)", "id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "kwargsrepr": "{}", "timelimit": [null, null], "lang": "py", "root_id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "shadow": null, "parent_id": null, "eta": null, "task": "task.add", "origin": "gen17195@ubuntu"}, "properties": {"delivery_tag": "48d8ba98-a961-425f-992a-444b339779e8", "delivery_info": {"exchange": "", "routing_key": "celery"}, "reply_to": "380360ff-eb4a-3afd-94c2-2590a3e825a4", "priority": 0, "delivery_mode": 2, "correlation_id": "8744e933-05a4-4acc-a82a-77f4c371edcb", "body_encoding": "base64"}, "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsfV0=", "content-type": "application/json", "content-encoding": "utf-8"}"
    3) "{"properties": {"priority": 0, "correlation_id": "7cc62995-84df-45ce-8067-5f321e4443e7", "reply_to": "f15a7899-947d-3c2b-9c18-aa267588315c", "body_encoding": "base64", "delivery_mode": 2, "delivery_info": {"routing_key": "celery", "exchange": ""}, "delivery_tag": "52649bd9-7bc3-4587-8ca4-25035b04bc6a"}, "content-encoding": "utf-8", "headers": {"kwargsrepr": "{}", "eta": null, "task": "task.add", "parent_id": null, "timelimit": [null, null], "lang": "py", "origin": "gen17053@ubuntu", "id": "7cc62995-84df-45ce-8067-5f321e4443e7", "group": null, "expires": null, "argsrepr": "(10, 20)", "shadow": null, "retries": 0, "root_id": "7cc62995-84df-45ce-8067-5f321e4443e7"}, "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=", "content-type": "application/json"}"
    4) "{"properties": {"priority": 0, "body_encoding": "base64", "reply_to": "2935bf4c-0d9c-3603-b8d2-e70735f5e378", "delivery_info": {"routing_key": "celery", "exchange": ""}, "delivery_mode": 2, "correlation_id": "d80331aa-4a7c-4921-8f29-de43c933737a", "delivery_tag": "002c77d7-f224-422d-ac1b-21997433a274"}, "headers": {"expires": null, "parent_id": null, "origin": "gen17023@ubuntu", "root_id": "d80331aa-4a7c-4921-8f29-de43c933737a", "lang": "py", "kwargsrepr": "{}", "task": "task.add", "argsrepr": "(10, 20)", "timelimit": [null, null], "id": "d80331aa-4a7c-4921-8f29-de43c933737a", "shadow": null, "eta": null, "group": null, "retries": 0}, "content-type": "application/json", "content-encoding": "utf-8", "body": "W1sxMCwgMjBdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0="}"

    现在只是消息中间件里面存储了,并没有人来消费,具体通过谁消费呢?worker来消费,celery worker -A task -l INFO

    这里需要说明一下,celery - A 后面跟的是 Celery 类的实例所在的模块名,此处是task,在task.py 的同级目录下执行 celery worker -A task -l INFO
     
     -------------- celery@ubuntu v4.3.0 (rhubarb)
    ---- **** ----- 
    --- * ***  * -- Linux-4.4.0-31-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-18 20:45:52
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         my_task:0x7fe31dd6b550
    - ** ---------- .> transport:   redis://127.0.0.1:6379/3
    - ** ---------- .> results:     redis://127.0.0.1:6379/4
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . task.add
    
    [2020-03-18 20:45:52,929: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
    [2020-03-18 20:45:52,941: INFO/MainProcess] mingle: searching for neighbors
    [2020-03-18 20:45:53,968: INFO/MainProcess] mingle: all alone
    [2020-03-18 20:45:53,996: INFO/MainProcess] celery@ubuntu ready.
    [2020-03-18 20:45:54,159: INFO/MainProcess] Received task: task.add[d80331aa-4a7c-4921-8f29-de43c933737a]  
    [2020-03-18 20:45:54,164: INFO/MainProcess] Received task: task.add[7cc62995-84df-45ce-8067-5f321e4443e7]  
    [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-7] 进入函数
    [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-5] 进入函数
    [2020-03-18 20:45:54,174: INFO/MainProcess] Received task: task.add[8744e933-05a4-4acc-a82a-77f4c371edcb]  
    [2020-03-18 20:45:54,179: INFO/MainProcess] Received task: task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d]  
    [2020-03-18 20:45:54,178: WARNING/ForkPoolWorker-1] 进入函数
    [2020-03-18 20:45:54,185: WARNING/ForkPoolWorker-2] 进入函数
    [2020-03-18 20:45:59,190: INFO/ForkPoolWorker-7] Task task.add[7cc62995-84df-45ce-8067-5f321e4443e7] succeeded in 5.022220275001018s: 30
    [2020-03-18 20:45:59,195: INFO/ForkPoolWorker-1] Task task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] succeeded in 5.0176936099960585s: 30
    [2020-03-18 20:45:59,197: INFO/ForkPoolWorker-5] Task task.add[d80331aa-4a7c-4921-8f29-de43c933737a] succeeded in 5.028447696997318s: 30
    [2020-03-18 20:45:59,200: INFO/ForkPoolWorker-2] Task task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] succeeded in 5.015312493997044s: 30

    我一共执行了四次app.py,Received了四次。所以会消费四次,消费谁,消费被装饰的函数,add()四次,也就是执行四次,所以会有四个进入函数的打印,四个30的返回值!

    然后我们再来看看,backend里面存储的是什么?

    127.0.0.1:6379[3]> select 4
    OK
    127.0.0.1:6379[4]> keys *
    1) "cart_selected_2"
    2) "celery-task-meta-8744e933-05a4-4acc-a82a-77f4c371edcb"
    3) "celery-task-meta-d80331aa-4a7c-4921-8f29-de43c933737a"
    4) "celery-task-meta-7cc62995-84df-45ce-8067-5f321e4443e7"
    5) "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d"
    6) "cart_2"
    127.0.0.1:6379[4]> type "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d"
    string
    127.0.0.1:6379[4]> get "celery-task-meta-054cf21c-9e0d-48ef-be4c-a27f4292b56d"
    "{"task_id": "054cf21c-9e0d-48ef-be4c-a27f4292b56d", "traceback": null, "children": [], "date_done": "2020-03-18T12:45:59.188311", "result": 30, "status": "SUCCESS"}"
    127.0.0.1:6379[4]> 

    worker消费的返回值,存在了每一个任务的 "result": 30,里面,因此我们可以知道,backend是存储消费结果,broker是存储等待消费的信号

    此时我们在python3 app.py,发现

    [2020-03-18 20:45:52,929: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
    [2020-03-18 20:45:52,941: INFO/MainProcess] mingle: searching for neighbors
    [2020-03-18 20:45:53,968: INFO/MainProcess] mingle: all alone
    [2020-03-18 20:45:53,996: INFO/MainProcess] celery@ubuntu ready.
    [2020-03-18 20:45:54,159: INFO/MainProcess] Received task: task.add[d80331aa-4a7c-4921-8f29-de43c933737a]  
    [2020-03-18 20:45:54,164: INFO/MainProcess] Received task: task.add[7cc62995-84df-45ce-8067-5f321e4443e7]  
    [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-7] 进入函数
    [2020-03-18 20:45:54,169: WARNING/ForkPoolWorker-5] 进入函数
    [2020-03-18 20:45:54,174: INFO/MainProcess] Received task: task.add[8744e933-05a4-4acc-a82a-77f4c371edcb]  
    [2020-03-18 20:45:54,179: INFO/MainProcess] Received task: task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d]  
    [2020-03-18 20:45:54,178: WARNING/ForkPoolWorker-1] 进入函数
    [2020-03-18 20:45:54,185: WARNING/ForkPoolWorker-2] 进入函数
    [2020-03-18 20:45:59,190: INFO/ForkPoolWorker-7] Task task.add[7cc62995-84df-45ce-8067-5f321e4443e7] succeeded in 5.022220275001018s: 30
    [2020-03-18 20:45:59,195: INFO/ForkPoolWorker-1] Task task.add[8744e933-05a4-4acc-a82a-77f4c371edcb] succeeded in 5.0176936099960585s: 30
    [2020-03-18 20:45:59,197: INFO/ForkPoolWorker-5] Task task.add[d80331aa-4a7c-4921-8f29-de43c933737a] succeeded in 5.028447696997318s: 30
    [2020-03-18 20:45:59,200: INFO/ForkPoolWorker-2] Task task.add[054cf21c-9e0d-48ef-be4c-a27f4292b56d] succeeded in 5.015312493997044s: 30
    [2020-03-18 21:03:16,842: INFO/MainProcess] Received task: task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae]  
    [2020-03-18 21:03:16,845: WARNING/ForkPoolWorker-6] 进入函数
    [2020-03-18 21:03:21,867: INFO/ForkPoolWorker-6] Task task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae] succeeded in 5.022131994999654s: 30
    因为,worker上面已经启动了,所以这里处于随时监听的状态,我们执行了app.py后,消息中间件收到了消息,worker从里面获得了id,然后消费执行add函数,并将返回值
    Received task: task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae]   消息中间件接收消息
    [2020-03-18 21:03:16,845: WARNING/ForkPoolWorker-6] 进入函数   worker消费消息
    Task task.add[0dfe70b8-3a10-47a5-9664-f08a7834eaae] succeeded in 5.022131994999654s: 30 消费结束,返回值
     
    正常的项目,celery是一个单独的模块,需要有单独的配置文件,我们对上面的代码进行优化!项目目录如下
     
    # __init__.py
    from celery import Celery
    
    app = Celery("demo")
    # 通过celery实例加载配置模块
    app.config_from_object("celery_app.celery_config")
    # celery_config.py
    BROKER_URL = "redis://127.0.0.1:6379/3"
    BACKEND_RESULT_URL = "redis://127.0.0.1:6379/4"
    CELERY_TIMEZONE = "Asia/Shanghai"
    # 导入指定的任务模块
    CELERY_IMPORTS = (
        "celery_app.task1",
        "celery_app.task2"
    )
    # task1.py
    
    import time
    from celery_app import app
    
    
    @app.task
    def add(x, y):
        print("进入函数")
        time.sleep(5)
        return x + y
    # task2.py
    
    import time
    from celery_app import app
    
    
    @app.task
    def multiply(x, y):
        print("进入函数")
        time.sleep(5)
        return x * y
    # app.py
    from celery_app import task1
    from celery_app import task2
    
    if __name__ == '__main__':
        task1.add.delay(250,250)
        task2.multiply.delay(100,100)
        print("end")
    启动worker进行监听 

    celery worker -A celery_app -l INFO

     -------------- celery@ubuntu v4.3.0 (rhubarb)
    ---- **** ----- 
    --- * ***  * -- Linux-4.4.0-31-generic-x86_64-with-Ubuntu-16.04-xenial 2020-03-18 21:33:48
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         demo:0x7f90118e4550
    - ** ---------- .> transport:   redis://127.0.0.1:6379/3
    - ** ---------- .> results:     disabled://
    - *** --- * --- .> concurrency: 8 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . celery_app.task1.add
      . celery_app.task2.multiply
    
    [2020-03-18 21:33:48,303: INFO/MainProcess] Connected to redis://127.0.0.1:6379/3
    [2020-03-18 21:33:48,316: INFO/MainProcess] mingle: searching for neighbors
    [2020-03-18 21:33:49,343: INFO/MainProcess] mingle: all alone
    [2020-03-18 21:33:49,369: INFO/MainProcess] celery@ubuntu ready.
    监听的状态是 链接消息中间件成功
    python3 app.py 执行,把消息发给中间键
    [2020-03-18 21:41:51,382: INFO/MainProcess] Received task: celery_app.task1.add[192b3015-af33-4153-ab09-3febdd99ebfd]  
    [2020-03-18 21:41:51,388: WARNING/ForkPoolWorker-3] 进入函数
    [2020-03-18 21:41:56,393: INFO/ForkPoolWorker-3] Task celery_app.task1.add[192b3015-af33-4153-ab09-3febdd99ebfd] succeeded in 5.004680051999458s: 500
    [2020-03-18 21:43:31,109: INFO/MainProcess] Received task: celery_app.task1.add[d8f89851-5b27-4518-ac73-2bc43495fa1e]  
    [2020-03-18 21:43:31,111: WARNING/ForkPoolWorker-5] 进入函数
    [2020-03-18 21:43:36,116: INFO/ForkPoolWorker-5] Task celery_app.task1.add[d8f89851-5b27-4518-ac73-2bc43495fa1e] succeeded in 5.004891623000731s: 500
    [2020-03-18 21:45:20,340: INFO/MainProcess] Received task: celery_app.task1.add[ad0cdd51-e937-4f9b-bc7d-a8ece8047705]  
    [2020-03-18 21:45:20,341: WARNING/ForkPoolWorker-7] 进入函数
    [2020-03-18 21:45:25,594: INFO/ForkPoolWorker-7] Task celery_app.task1.add[ad0cdd51-e937-4f9b-bc7d-a8ece8047705] succeeded in 5.252825808995112s: 500
    [2020-03-18 21:46:44,956: INFO/MainProcess] Received task: celery_app.task1.add[55fcb2c2-4242-43dd-9848-6352039965f7]  
    [2020-03-18 21:46:44,957: WARNING/ForkPoolWorker-1] 进入函数
    [2020-03-18 21:46:44,959: INFO/MainProcess] Received task: celery_app.task2.multiply[2827587c-1f13-4099-be15-bb469ff86a27]  
    [2020-03-18 21:46:44,961: WARNING/ForkPoolWorker-3] 进入函数
    [2020-03-18 21:46:49,963: INFO/ForkPoolWorker-3] Task celery_app.task2.multiply[2827587c-1f13-4099-be15-bb469ff86a27] succeeded in 5.002474594999512s: 10000
    [2020-03-18 21:46:49,963: INFO/ForkPoolWorker-1] Task celery_app.task1.add[55fcb2c2-4242-43dd-9848-6352039965f7] succeeded in 5.0065162960017915s: 500

    task2执行一次是因为,task2.multiply.apply_asny(100,100)换成 task2.multiply.delay,解决,前三次,执行报错multiply() argument after ** must be a mapping, not int,说明需要传字典参数

    celery的定时任务
    类似于crontab,每日需要做的事情都可以通过定时任务做
     
    # mytask.py
    from celery import Celery
    from datetime import timedelta
    
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery('mytask', broker=broker, backend=backend)
    app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
    app.conf.CELERY_ENABLE_UTC = True
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        sender.add_periodic_task(timedelta(seconds=10),getRuminate.s('start'))
    
    @app.task
    def getRuminate(arg):
        print("hehe")
        if arg == "start":
            print(arg)
            demo()
    
    def demo():
        print("定时执行任务")
        return "定时执行任务"

    # mytask.py的同级目录下执行,先启动定时调度任务

    [root@VM_0_13_centos celery_demo]# celery beat -A mytask -l INFO

    celery beat v4.4.2 (cliffs) is starting.
    __    -    ... __   -        _
    LocalTime -> 2020-03-23 21:46:33
    Configuration ->
        . broker -> redis://127.0.0.1:6379/1
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%INFO
        . maxinterval -> 5.00 minutes (300s)
    [2020-03-23 21:46:33,006: INFO/MainProcess] beat: Starting...
    [2020-03-23 21:46:33,061: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)
    [2020-03-23 21:46:43,043: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)
    [2020-03-23 21:46:53,043: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)
    [2020-03-23 21:47:03,044: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)
    [2020-03-23 21:47:13,044: INFO/MainProcess] Scheduler: Sending due task mytask.getRuminate('start') (mytask.getRuminate)

    每十秒发布一个任务,然后启动消费任务

    celery worker -A mytask -l INFO

    [root@VM_0_13_centos celery_demo]# celery worker -A mytask -l INFO
    /usr/local/python3/lib/python3.7/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
    absolutely not recommended!
    
    Please specify a different user using the --uid option.
    
    User information: uid=0 euid=0 gid=0 egid=0
    
      uid=uid, euid=euid, gid=gid, egid=egid,
     
     -------------- celery@VM_0_13_centos v4.4.2 (cliffs)
    --- ***** ----- 
    -- ******* ---- Linux-3.10.0-862.el7.x86_64-x86_64-with-centos-7.5.1804-Core 2020-03-23 21:47:08
    - *** --- * --- 
    - ** ---------- [config]
    - ** ---------- .> app:         mytask:0x7fc80fb6e588
    - ** ---------- .> transport:   redis://127.0.0.1:6379/1
    - ** ---------- .> results:     redis://127.0.0.1:6379/2
    - *** --- * --- .> concurrency: 1 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . mytask.getRuminate
    
    [2020-03-23 21:47:08,139: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
    [2020-03-23 21:47:08,146: INFO/MainProcess] mingle: searching for neighbors
    [2020-03-23 21:47:09,161: INFO/MainProcess] mingle: all alone
    [2020-03-23 21:47:09,170: INFO/MainProcess] celery@VM_0_13_centos ready.
    [2020-03-23 21:47:09,209: INFO/MainProcess] Received task: mytask.getRuminate[851b5a5b-d978-4274-b1c5-32a09b827bb3]  
    [2020-03-23 21:47:09,210: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:09,211: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:09,215: INFO/ForkPoolWorker-1] Task mytask.getRuminate[851b5a5b-d978-4274-b1c5-32a09b827bb3] succeeded in 0.00445167699990634s: None
    [2020-03-23 21:47:09,215: INFO/MainProcess] Received task: mytask.getRuminate[bdbecb99-0771-4912-bc2f-b2eb26a8c99b]  
    [2020-03-23 21:47:09,217: INFO/MainProcess] Received task: mytask.getRuminate[73887936-80d3-4ab4-86eb-020a90541e52]  
    [2020-03-23 21:47:09,217: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:09,217: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:09,218: INFO/ForkPoolWorker-1] Task mytask.getRuminate[bdbecb99-0771-4912-bc2f-b2eb26a8c99b] succeeded in 0.0005744890002006287s: None
    [2020-03-23 21:47:09,219: INFO/MainProcess] Received task: mytask.getRuminate[3934bc2a-8a2a-4d5d-a22f-afeef0b4924c]  
    [2020-03-23 21:47:09,220: INFO/MainProcess] Received task: mytask.getRuminate[0b945bd6-2ca3-4eca-8113-505620cdcaae]  
    [2020-03-23 21:47:09,221: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:09,221: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:09,221: INFO/ForkPoolWorker-1] Task mytask.getRuminate[73887936-80d3-4ab4-86eb-020a90541e52] succeeded in 0.0005343299999367446s: None
    [2020-03-23 21:47:09,222: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:09,222: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:09,223: INFO/ForkPoolWorker-1] Task mytask.getRuminate[3934bc2a-8a2a-4d5d-a22f-afeef0b4924c] succeeded in 0.00047098000004552887s: None
    [2020-03-23 21:47:09,224: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:09,224: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:09,224: INFO/ForkPoolWorker-1] Task mytask.getRuminate[0b945bd6-2ca3-4eca-8113-505620cdcaae] succeeded in 0.0005017210000914929s: None
    [2020-03-23 21:47:13,046: INFO/MainProcess] Received task: mytask.getRuminate[346bb012-fa87-4248-94a6-a623aa6f079d]  
    [2020-03-23 21:47:13,047: WARNING/ForkPoolWorker-1] start
    [2020-03-23 21:47:13,047: WARNING/ForkPoolWorker-1] 定时执行任务
    [2020-03-23 21:47:13,048: INFO/ForkPoolWorker-1] Task mytask.getRuminate[346bb012-fa87-4248-94a6-a623aa6f079d] succeeded in 0.0007134340000902739s: None

    我们继续做一个demo,每十秒给文件写入一个时间戳

    # mytask.py

    from datetime import timedelta
    import datetime
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery('mytask', broker=broker, backend=backend)
    app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
    app.conf.CELERY_ENABLE_UTC = True
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
    # 传参是用元祖传递 sender.add_periodic_task(timedelta(seconds
    =10),getRuminate.s(1,2)) @app.task def getRuminate(a,b): print(a,b) demo(a,b) def demo(a,b): with open("/workspace/celery_demo.txt","a") as f: f.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+str(a)+str(b))
     
    #
    文件输出结果

    2020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:30122020-03-23 22:19:33122020-03-23 22:19:43122020-03-23 22:19:5312
    ~

    前面几个2020-03-23 22:19:3012 是因为任务发布后,没有及时启动worker消费,开启消费后,瞬间完成,所以时间一样,后面重启beat后,立马启动worker,时间间隔刚好是10s钟

    练习:定时爬取一个网站信息,入库
     
     
    config 存储数据库的连接信息和日志的配置信息
    db 存储操作数据库的方法
    logs 存储错误日志
     
    mytask.py
    import requests
    from celery import Celery
    from datetime import timedelta
    import datetime
    from lxml import etree
    from dateutil.relativedelta import relativedelta
    from celery.schedules import crontab
    import os
    import re
    import configparser
    from celery_demo.logs import mylogger
    from celery_demo.db import mongodbwrapper
    
    
    broker = 'redis://127.0.0.1:6379/1'
    backend = 'redis://127.0.0.1:6379/2'
    app = Celery('mytask', broker=broker, backend=backend)
    app.conf.CELERY_TIMEZONE = 'Asia/Shanghai'
    app.conf.CELERY_ENABLE_UTC = True
    
    
    root_path = os.getcwd() + "/celery_demo"
    cf = configparser.ConfigParser()
    cf.read(root_path + "/config/config.conf")
    logger = mylogger.MyLogger(root_path + cf.get("logs", "LOG_PATH"), cf.get("logs", "LOG_NAME")).getlogger()
    monger_wrapper = mongodbwrapper.MongoWrapper(cf, logger)
    
    logger.info(monger_wrapper)
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        logger.info("进入了调度函数")
        sender.add_periodic_task(timedelta(seconds=50),  # 这里的时间后续会改成 crontab格式的,每天凌晨1点爬取数据
                                 spider.s("start"))
    
    @app.task
    def spider(arg):
        if arg == "start":
            print("spider is running")
            logger.info("spider is running")
            parse_detail()
    
    def parse_html(url):
        headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}
        try:
            response = ""
            response = requests.get(url=url,headers=headers)
            if response.status_code != 200:
                logger.error("parse_html" + str(response.status_code))
            response = response.content
            html_str = response.decode("gb2312")
            e = etree.HTML(html_str)
            div_list = e.xpath(".//div[@id='main']/div[@id='original']/div[@id='news']/div[@class='NewRight_01']/div[@class='Rul_01']")
            if len(div_list) == 0:
                return ({"errmsg":"获取数据失败","status":0})
            li_list = div_list[0].xpath(".//li//a")
            if len(li_list) == 0:
                return ({"errmsg": "获取数据失败","status":0})
            temp_list = list()
            for li in li_list:
                temp_dict = dict()
                title_list = li.xpath(".//@title")
                if len(title_list) == 0:
                    return ({"errmsg": "获取数据失败", "status": 0})
                href_list = li.xpath(".//@href")
                if len(href_list) == 0:
                    return ({"errmsg": "获取数据失败", "status": 0})
                now = datetime.datetime.now()
                yesterday = now - relativedelta(days=1)
                yesterday_str = str(yesterday.year) + "" + str(yesterday.month) + "" + str(yesterday.day) + ""
                if yesterday_str in title_list[0] and ("" in title_list[0] or "" in title_list[0] or "" in title_list[0]):
                    temp_dict["title"] = title_list[0]
                    temp_dict["href"] = href_list[0]
                    temp_list.append(temp_dict)
            logger.info(temp_list)
            return {"msg":temp_list,"status":1}
        except Exception as e:
            logger.error("parse_html" + str(e))
    
    def parse_detail():
        try:
            logger.info("即将进入解析函数")
            url = "xxx"
            info = parse_html(url)
            if "errmsg" in info.keys():
                return ({"errmsg": "获取数据失败", "status": 0})
            info_list = info.get("msg")
            temp_list= list()
            for info in info_list:
                temp_dict = dict()
                title = info.get("title")
                if "羊肉" in title:
                    title = "羊肉行情"
                elif "牛肉" in title:
                    title = "牛肉行情"
                elif "猪肉" in title:
                    title = "猪肉行情"
                href = info.get("href")
                href = href.split("/")
                detail_url = url + href[0]
                headers = {
                    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36"}
                response = requests.get(url=detail_url, headers=headers).content
                html_str = response.decode("gb2312",errors="ignore")
                e = etree.HTML(html_str)
                div_list = e.xpath(".//div[@class='newDetail_new']")
                if len(div_list) == 0:
                    logger.info({"errmsg": "获取数据失败", "status": 0})
                    return "error"
                p_list = e.xpath(".//div[@class='newDetail_new']//p")
                if len(p_list) == 0:
                    logger.info({"errmsg": "获取数据失败", "status": 0})
                    return "error"
                temp_data_list = list()
                for p in p_list:
                    content = p.text
                    if content:
                        data_info_dict = dict()
                        content = p.text
                        content_list = content.split(" ")
                        now = datetime.datetime.now()
                        yesterday = now - relativedelta(days=1)
                        yesterday_str = str(yesterday.year) + "" + str(yesterday.month) + "" + str(yesterday.day) + ""
                        if yesterday_str in content_list:
                            price_str = content_list[3]
                            result = re.findall(r"d+.d|d+", price_str)
                            data_info_dict["price"] = float(result[0])
                            now = datetime.datetime.now()
                            yesterday = now - relativedelta(days=1)
                            data_info_dict["time"] = yesterday
                            data_info_dict["position"] = content_list[4]
                            temp_data_list.append(data_info_dict)
                temp_dict["title"] = title
                temp_dict["data_list"] = temp_data_list
                temp_list.append(temp_dict)
            save_data(temp_list)
        except Exception as e:
            logger.error(str(e))
    
    def save_data(data_dict):
        for data in data_dict:
            animal_type = ""
            collection = ""
            if data.get("title") == "羊肉行情":
                collection = "sheep_price"
                animal_type = "goat"
            elif data.get("title") == "牛肉行情":
                animal_type = "cow"
                collection = "cow_price"
            elif data.get("title") == "猪肉行情":
                collection = "pig_price"
                animal_type = "pig"
            insert_dict = dict()
            now = datetime.datetime.now()
            yesterday = now - relativedelta(days=1)
            insert_dict["time"] = yesterday
            insert_dict["data"] = data.get("data_list")
            insert_dict["animal_type"] = animal_type
            tmp_res = monger_wrapper.insert_one(collection,insert_dict)
            result = str(tmp_res.inserted_id)
            logger.info("插入数据成功,数据_id为" + result)

    需要注意的是,因为在mytask里面导包是从celery_demo的文件下导包,因此celery beat 和 celery worker 的执行路径要是 celery_demo的文件夹路径,这里放在 /workspace下面

    [root@VM_0_13_centos workspace]# ls
    celerybeat-schedule  celery_demo  celery_demo.txt  chinese.py
    [root@VM_0_13_centos workspace]# celery beat -A celery_demo.mytask -l INFO
    celery beat v4.4.2 (cliffs) is starting.
    __    -    ... __   -        _
    LocalTime -> 2020-03-28 19:54:55
    Configuration ->
        . broker -> redis://127.0.0.1:6379/1
        . loader -> celery.loaders.app.AppLoader
        . scheduler -> celery.beat.PersistentScheduler
        . db -> celerybeat-schedule
        . logfile -> [stderr]@%INFO
        . maxinterval -> 5.00 minutes (300s)
    [2020-03-28 19:54:55,705: INFO/MainProcess] beat: Starting...
    [2020-03-28 19:54:55,799: INFO/MainProcess] Scheduler: Sending due task celery_demo.mytask.spider('start') (celery_demo.mytask.spider)
     
    # TODO
  • 相关阅读:
    吴裕雄--天生自然ANDROID开发学习:1.9 Android程序签名打包
    吴裕雄--天生自然ANDROID开发学习:1.8 工程相关解析(各种文件,资源访问)
    html 上传预览图片
    git笔记
    iscroll 下拉刷新功能
    移动端页面 弹出框滚动,底部body锁定,不滚动 / 微信网页禁止回弹效果
    getElementsByClassName 兼容性
    登录页面-输入框清空按钮
    FireFox中iframe的返回上页问题
    ajax是异步的,异步取数据,如何能保证数据是存在的。
  • 原文地址:https://www.cnblogs.com/meloncodezhang/p/12511908.html
Copyright © 2011-2022 走看看