zoukankan      html  css  js  c++  java
  • celery使用

    1、定义一个celery 应用  和  其他的任务函数, 放在文件celery_app_task.py中

    #celery_app_task.py
    import celery
    import time
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://121.36.209.194:6379/1'  #结果存储在1中 打开reids可视化即可明白
    broker='redis://121.36.209.194:6379/2'   #消息中间件,消息中间件放在2中
    
    cel=celery.Celery('test',backend=backend,broker=broker)
    #定义要让celery执行的任务,其实就是一个函数,用@cel.task装饰一下
    @cel.task
    def add(x,y):
        return x+y

    2、定义一个add_task.py文件, 通过celery将任务发送到broker消息队列

    #add_task.py
    
    import  celery_app_task
    
    rs = celery_app_task.add.delay(1,9)
    print(rs)

    运行add_task.py共两次, 某一次的执行结果如下:

    "D:Program Files (x86)python36python.exe" D:/test0429/add_task.py
    032f289b-5a3b-40de-b5de-f58e39f75771
    
    Process finished with exit code 0

    查看redis中第2个数据库中,是否存放了两个消息队列

    root@ecs-s6-medium-2-linux-20191230105810:~# ps aux|grep celery
    root      3799  0.1  1.7  98688 34832 pts/3    S+   19:10   0:01 /usr/bin/python3 /usr/local/bin/celery -A celery_app_task worker -l info
    root      3803  0.0  1.4  97972 29760 pts/3    S+   19:10   0:00 /usr/bin/python3 /usr/local/bin/celery -A celery_app_task worker -l info
    root      4155  0.0  0.0  14428  1000 pts/5    S+   19:24   0:00 grep --color=auto celery
    root@ecs-s6-medium-2-linux-20191230105810:~#
    root@ecs-s6-medium-2-linux-20191230105810:~#
    root@ecs-s6-medium-2-linux-20191230105810:~# kill -9 3799 3803
    root@ecs-s6-medium-2-linux-20191230105810:~# ps aux|grep celery
    root      4157  0.0  0.0  14428  1036 pts/5    S+   19:24   0:00 grep --color=auto celery
    root@ecs-s6-medium-2-linux-20191230105810:~#
    root@ecs-s6-medium-2-linux-20191230105810:~#
    root@ecs-s6-medium-2-linux-20191230105810:~# docker ps
    CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                    NAMES
    da8316ec988d        redis               "docker-entrypoint.s…"   17 minutes ago      Up 17 minutes       0.0.0.0:6379->6379/tcp   cool_galois
    root@ecs-s6-medium-2-linux-20191230105810:~# docker exec -it da8316ec988d  /bin/bash
    root@da8316ec988d:/data# redis-cli
    127.0.0.1:6379> keys *
    (empty list or set)
    127.0.0.1:6379> select 2
    OK
    127.0.0.1:6379[2]> keys *
    1) "_kombu.binding.celeryev"
    2) "_kombu.binding.celery"
    3) "_kombu.binding.celery.pidbox"
    4) "celery"
    127.0.0.1:6379[2]> type celery
    list
    127.0.0.1:6379[2]> lrange celery 0 -1
    1) "{"body": "W1sxLCA5XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "celery_app_task.add", "id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "parent_id": null, "argsrepr": "(1, 9)", "kwargsrepr": "{}", "origin": "gen7748@wzg"}, "properties": {"correlation_id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "reply_to": "7c55c123-b8e2-3010-af48-13bac103e22c", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "82573b1d-5e6f-40c5-a48e-79efaf117b7d"}}"
    127.0.0.1:6379[2]>
    127.0.0.1:6379[2]>
    127.0.0.1:6379[2]> lrange celery 0 -1
    1) "{"body": "W1sxLCA5XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "celery_app_task.add", "id": "032f289b-5a3b-40de-b5de-f58e39f75771", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "032f289b-5a3b-40de-b5de-f58e39f75771", "parent_id": null, "argsrepr": "(1, 9)", "kwargsrepr": "{}", "origin": "gen5980@wzg"}, "properties": {"correlation_id": "032f289b-5a3b-40de-b5de-f58e39f75771", "reply_to": "65d63650-97c5-39b2-ade5-f4554772ca6b", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "72429ecb-2e01-4531-997e-534d67892829"}}"
    2) "{"body": "W1sxLCA5XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "celery_app_task.add", "id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "shadow": null, "eta": null, "expires": null, "group": null, "retries": 0, "timelimit": [null, null], "root_id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "parent_id": null, "argsrepr": "(1, 9)", "kwargsrepr": "{}", "origin": "gen7748@wzg"}, "properties": {"correlation_id": "9ecfa5ac-09e3-4a8b-9b57-2182e05e141d", "reply_to": "7c55c123-b8e2-3010-af48-13bac103e22c", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "82573b1d-5e6f-40c5-a48e-79efaf117b7d"}}"
    127.0.0.1:6379[2]>

    3、在任意一个linux服务器上也存放一个celery_app_task.py脚本, 名字必须跟celery添加任务到消息队列时倒入的文件名称一致

    即,跟下面import的文件名一致

    import  celery_app_task
    
    rs = celery_app_task.add.delay(1,9)
    print(rs)

    然后执行celery 启动worker的命令:

    root@ecs-s6-medium-2-linux-20191230105810:/home# celery -A celery_app_task worker -l info
    /usr/local/lib/python3.6/dist-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
    absolutely not recommended!
    
    Please specify a different user using the --uid option.
    
    User information: uid=0 euid=0 gid=0 egid=0
    
      uid=uid, euid=euid, gid=gid, egid=egid,
    
     -------------- celery@ecs-s6-medium-2-linux-20191230105810 v4.4.2 (cliffs)
    --- ***** -----
    -- ******* ---- Linux-4.15.0-65-generic-x86_64-with-Ubuntu-18.04-bionic 2020-04-30 19:27:11
    - *** --- * ---
    - ** ---------- [config]
    - ** ---------- .> app:         test:0x7fd1c4f49780
    - ** ---------- .> transport:   redis://121.36.209.194:6379/2
    - ** ---------- .> results:     redis://121.36.209.194:6379/1
    - *** --- * --- .> concurrency: 1 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
    
    
    [tasks]
      . celery_app_task.add
    
    [2020-04-30 19:27:11,613: INFO/MainProcess] Connected to redis://121.36.209.194:6379/2
    [2020-04-30 19:27:11,640: INFO/MainProcess] mingle: searching for neighbors
    [2020-04-30 19:27:12,692: INFO/MainProcess] mingle: all alone
    [2020-04-30 19:27:12,749: INFO/MainProcess] celery@ecs-s6-medium-2-linux-20191230105810 ready.
    [2020-04-30 19:27:12,828: INFO/MainProcess] Received task: celery_app_task.add[9ecfa5ac-09e3-4a8b-9b57-2182e05e141d]
    [2020-04-30 19:27:12,837: INFO/MainProcess] Received task: celery_app_task.add[032f289b-5a3b-40de-b5de-f58e39f75771]
    [2020-04-30 19:27:12,842: INFO/ForkPoolWorker-1] Task celery_app_task.add[9ecfa5ac-09e3-4a8b-9b57-2182e05e141d] succeeded in 0.011989665000328387s: 10
    [2020-04-30 19:27:12,846: INFO/ForkPoolWorker-1] Task celery_app_task.add[032f289b-5a3b-40de-b5de-f58e39f75771] succeeded in 0.0030052060001253267s: 10

    woker启动后,两个任务被执行了; redis中的两个任务也变成空了;

    4、在python脚本中查看执行的结果

    #result.py
    from celery.result import AsyncResult
    from celery_app_task import cel
    
    async = AsyncResult(id="2f0207eb-b6aa-42cc-84d2-2450ececbb18", app=cel)
    
    if async.successful(): #如果执行成功,获取到结果
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    运行结果如下:

    "D:Program Files (x86)python36python.exe" D:/test0429/result.py
    10
    
    Process finished with exit code 0
  • 相关阅读:
    什么是数据挖掘?
    Oracle 泵导入导出
    如何创建一个 mongo 数据库并为它添加一个认证用户?
    如何提高 windows 的使用效率?--巧用运行命令
    在 vs2017 中使用 C# 7 新特性。
    什么是按引用传递和按值传递?
    Vue、Vuex+Cookie 实现自动登陆 。
    Web.config 灵活配置
    远程终端
    js框架总结
  • 原文地址:https://www.cnblogs.com/harryTree/p/12810836.html
Copyright © 2011-2022 走看看