zoukankan      html  css  js  c++  java
  • celery 实例进阶

    认识

    这里有几个概念,task、worker、broker。
    顾名思义,task 就是老板交给你的各种任务,worker 就是你手下干活的人员。

    那什么是 Broker 呢?

    老板给你下发任务时,你需要 把它记下来, 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本或者excel,或者是你的 任何时间管理工具。

    Broker  则是 Celery 记录task的地方。
    作为一个任务管理者的你,将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来,你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)

    1. broker为rabbitmq

    #tasks.py

    复制代码
    from celery import Celery
    
    app = Celery('tasks', broker='amqp://admin:admin@localhost:5672')
    
    @app.task
    def add(x, y):
        return x + y
    复制代码

    启动

    celery -A tasks worker --loglevel=info

    运行

    复制代码
    >>> from tasks import add
    >>> add(1, 3)
    4
    >>> add.delay(1,3)
    <AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83>
    >>> 
    复制代码

    :delay是使用异步的方式,会压入到消息队列。否则,不会使用消息队列。

    文件名为tasks.py,则其中代码app = Celery('tasks', broker=),Celery第一个参数为工程名,启动时也是celery -A tasks worker --loglevel=info

    对比

    :投入到指定的队列用:add.delay(1, 3, queue='queue_add1') 

    test_2.py

    复制代码
    from celery import Celery
    
    app = Celery('proj', broker='amqp://admin:admin@localhost:5672', include='test_2')
    
    @app.task
    def add(x, y):
        return x + y
    复制代码

    2. 以python+文件名的方式启动

    例1:

    #test.py

    复制代码
    from celery import Celery
    import time
    app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(5)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    启动

    python test.py worker 

    celery默认启动的worker数为内核个数,如果指定启动个数,用参数-c,例

    python test.py worker -c 2

    例2:

    #test.py

    复制代码
    from celery import Celery
    import time
    app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(2)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    #eg.py

    复制代码
    from test import *
    import time
    
    rev = []
    for i in range(3):
        rev.append(add.delay(1,3))
    
    print "len rev:", len(rev)
    while 1:
        tag = 1
        for key in rev:
            if not key.ready():
                tag = 0
                time.sleep(1)
                print "sleep 1"
        if tag:
            break
    print "_____________________>"
    复制代码

    3. broker为redis

    #test_redis.py

    复制代码
    from celery import Celery
    import time
    #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
    app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(5)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    启动

    python test_redis.py worker -c 2

    测试

    复制代码
    from celery import group
    from test_redis import *
    g = group(add.s(2, 3)).apply_async()
    g = group(add.s(2, 3)).apply_async()
    g = group(add.s(2, 3)).apply_async()
    g = group(add.s(2, 3)).apply_async()
    g = group(add.s(2, 3)).apply_async()
    for ret in g.get():
        print ret
    print "end-----------------------------------"
    复制代码

    结果

    5
    end-----------------------------------

    4. 两个队列(redis)

    #test_redis.py

    复制代码
    from celery import Celery
    import time
    #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
    app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(5)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    #test_redis_2.py

    复制代码
    from celery import Celery
    import time
    #app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
    app = Celery('test_redis_2', backend='redis', broker='redis://100.69.201.116:7001')
    
    @app.task
    def add_2(x, y):
        print "=======>"
        time.sleep(5)
        print "<================="
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    测试

    复制代码
    from celery import group
    from test_redis import *
    from test_redis_2 import *
    ll = [(1,2), (3,4), (5,6)]
    g = group(add.s(key[0], key[1]) for key in ll).apply_async()
    for ret in g.get():
        print ret
    print "end redis_1 -----------------------------------"
    
    ll = [(1,2), (3,4), (5,6)]
    g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
    for ret in g.get():
        print ":", ret
    print "end redis_2 -----------------------------------"
    复制代码

    结果

    复制代码
    3
    7
    11
    end redis_1 -----------------------------------
    : 3
    : 7
    : 11
    end redis_2 -----------------------------------
    复制代码

    5. 两个队列(同一个rabbitmq)

    注释:需要提前设置下队列

    ##例1

    #test.py

    复制代码
    from celery import Celery
    import time
    app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(5)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    #test_2.py

    复制代码
    from celery import Celery
    import time
    app = Celery('test_2', backend='amqp', broker='amqp://admin:admin@localhost:5672//hwzh')
    
    @app.task
    def add_2(x, y):
        print "=====>"
        time.sleep(5)
        print "<=========="
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    测试

    复制代码
    from celery import group
    from test import *
    from test_2 import *
    
    ll = [(1,2), (3,4), (7,8)]
    g = group(add.s(key[0], key[1]) for key in ll).apply_async()
    for ret in g.get():
        print ret
    
    ll = [(1,2), (3,4), (7,8)]
    g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
    for ret in g.get():
        print ret
    复制代码

    结果

    复制代码
    3
    7
    15
    3
    7
    15
    复制代码

    ##例2

    #test.py

    复制代码
    from celery import Celery
    import time
    app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//mq4')
    
    @app.task
    def add(x, y):
        print "------>"
        time.sleep(2)
        print "<--------------"
        return x + y
    
    @app.task
    def sum(x, y):
        print "------>"
        time.sleep(2)
        print "<--------------"
        return x + y
    
    if __name__ == "__main__":
        app.start()
    复制代码

    #eg2.py

    复制代码
    from test import *
    import time
    
    rev = []
    for i in range(3):
        rev.append(add.delay(1,3))
    
    for i in range(3):
        rev.append(sum.delay(1,3))
    
    print "len rev:", len(rev)
    while 1:
        tag = 1
        for key in rev:
            if not key.ready():
                tag = 0
                time.sleep(1)
                print "sleep 1"
        if tag:
            break
    print "_____________________>"
    复制代码

    6. 保存结果

    复制代码
    from celery import Celery
    
    app = Celery('tasks', backend='amqp', broker='amqp://admin:admin@localhost')
    
    @app.task
    def add(x, y): 
        return x + y
    复制代码

    启动

    celery -A tasks_1 worker --loglevel=info

    与前例不同:

    - ** ---------- [config]
    - ** ---------- .> app: tasks:0x7f8057931810
    - ** ---------- .> transport: amqp://admin:**@localhost:5672//
    - ** ---------- .> results: amqp

    运行

    复制代码
    >>> from tasks_1 import add
    >>> result = add.delay(1, 3)
    >>> result.ready()
    True
    >>> result.get()
    4
    复制代码

    7. 多个队列

    复制代码
    from celery import Celery
    from kombu import Exchange, Queue
    BROKER_URL = 'amqp://admin:admin@localhost//'
    app = Celery('tasks', backend='amqp',broker=BROKER_URL)
    app.conf.update(
         CELERY_ROUTES={
              "add1":{"queue":"queue_add1"},
              "add2":{"queue":"queue_add2"},
              "add3":{"queue":"queue_add3"},
              "add4":{"queue":"queue_add4"},
            },
    )
    @app.task
    def add1(x, y):
         return x + y
    
    @app.task
    def add2(x, y):
         return x + y
    
    @app.task
    def add3(x, y):
         return x + y
    
    @app.task
    def add4(x, y):
         return x + y
    复制代码

    8. 消息路由

    文件:tasks.py

    复制代码
    from celery import Celery, platforms
    import time
    import os
    
    app = Celery('proj', broker='amqp://admin:admin@ip:5672',
                 include=['tasks']
                 )
    app.conf.update(
        CELERY_ROUTES={
            'tasks.fun_1': {
                'queue': "q_1" 
            },
            'tasks.fun_2': {
                'queue': "q_2"
            }
        }
    )
    platforms.C_FORCE_ROOT = True 
    
    @app.task
    def fun_1(n):
        print "(((((((((((((((func_1", n
        return 1
    
    @app.task
    def fun_2(n):
        print n, ")))))))))))))))"
        return 2
    
    if __name__ == "__main__":
        app.start()
    复制代码

    启动

    python tasks.py worker -c 2 -Q q_1
    python tasks.py worker -c 2 -Q q_2

    两个消息队列:q_1, q_2,调用示例

    复制代码
    >>> from tasks import *
    >>> fun_1(1)
    (((((((((((((((func_1 1
    1
    >>> fun_1.delay(1)
    <AsyncResult: 528a2ad1-bc16-4bdc-beff-cd166fe3e885>
    >>> fun_2.delay(2)
    <AsyncResult: ee5881eb-b384-4a39-ba00-08aa8ee53504>
    复制代码

    9. woker内启多进程

    #tasks.py

    复制代码
    from celery import Celery
    import time
    import multiprocessing as mp
    
    app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")
    
    def test_func(i):
        print "beg...:", i
        time.sleep(5)
        print "....end:", i
        return i * 5
    
    @app.task
    def fun_1(n):
        curr_proc = mp.current_process()
        curr_proc.daemon = False
        p = mp.Pool(mp.cpu_count())
        curr_proc.daemon = True
        for i in range(n):
            p.apply_async(test_func, args=(i,))
        p.close()
        p.join()
        return 1
    
    if __name__ == "__main__":
        app.start()
    复制代码

    说明

    直接启动多进程是肯定不可以的,因为是守候进程(curr_proc.daemon=True),所以启多进程之前主动设置为非守候进程:curr_proc.daemon=False,启动了以后再设为守候进程

    #tasks_callback.py

    复制代码
    from celery import Celery
    import time
    import multiprocessing as mp
    
    app = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks_callback")
    rev = []
    def test_func(i):
        print "beg...:", i
        time.sleep(5)
        print "....end:", i
        return i * 5
    
    def callback_log(rev_val):
        rev.append(rev_val)
    
    @app.task
    def fun_1(n):
        print "before rev:", rev
        curr_proc = mp.current_process()
        curr_proc.daemon = False
        p = mp.Pool(mp.cpu_count())
        curr_proc.daemon = True
        for i in range(n):
            p.apply_async(test_func, args=(i,), callback=callback_log)
        p.close()
        p.join()
        print "after rev:", rev
        return 1
    
    if __name__ == "__main__":
        app.start()
    复制代码

    10. 常用参数配置

    1. CELERYD_PREFETCH_MULTIPLIER

    同时预取得消息个数,比如如果CELERYD_PREFETCH_MULTIPLIER=2,那么如果现在对于1个worker,有一个状态是STARTED, 那么可以有2个处于RECEVED状态(如果有的话),这样就避免了如果消息很多全部分下取,后起来的worker领不到消息的尴尬。

    参考代码

    复制代码
    from celery import Celery, platforms
    import time
    import os
    
    app = Celery('proj', broker='amqp://admin:admin@localhost:5672',
                 include=['tasks']
                 )
    app.conf.update(
        CELERYD_PREFETCH_MULTIPLIER=2,
        CELERY_ROUTES={
            'tasks.fun_1': {
                'queue': "q_1"
            },
            'tasks.fun_2': {
                'queue': "q_2"
            }
        }
    )
    platforms.C_FORCE_ROOT = True
    
    @app.task
    def fun_1(n):
        print "(((((((((((((((func_1", n
        time.sleep(20)
        return 1
    
    @app.task
    def fun_2(n):
        print n, ")))))))))))))))"
        return 2
    复制代码

    调用

    复制代码
    >>> from tasks import *
    >>> fun_1.delay(3)
    <AsyncResult: 609f2216-6785-409e-9f6f-85ae3fcce084>
    >>> fun_1.delay(3)
    <AsyncResult: 0230b8bd-b237-40ef-bc73-88929f8f8290>
    >>> fun_1.delay(3)
    <AsyncResult: 8fce172a-93c9-41f8-8c08-377a4363389c>
    >>> fun_1.delay(3)
    复制代码
  • 相关阅读:
    Spring@Profile注解
    day 32 子进程的开启 及其用法
    day 31 udp 协议SOCK_DGRAM
    day 30 客户端获取cmd 命令的步骤
    day 29 socket 理论
    day 29 socket 初级版
    有关 组合 继承
    day 27 多态 接口 类方法 静态方法 hashlib 摘要算法模块
    新式类和经典类的区别
    day 28 hasattr getattr serattr delattr 和带__内置__ 类的内置方法
  • 原文地址:https://www.cnblogs.com/zknublx/p/9149887.html
Copyright © 2011-2022 走看看