zoukankan      html  css  js  c++  java
  • celery使用demo

    # 环境: centos 6.9, python3, celery(4.2.1)
    # 运行:基于python3的虚拟环境

    1.准备工作

    pip install celery
    yum install rabbitmq-server

    2.创建脚本文件

      my_friends.py

    第一步:
        先定义我们自己的 Celery 对象。 
        该对象定义了任务的具体内容、任务队列的服务地址、以及保存任务执行结果的地址等重要信息
    from celery import Celery
    import time
    app = Celery('my_friends', backend='rpc://', broker='amqp://localhost')
    @app.task
    def my_friends(userId, newsId):
        print('start to my_friends task at {0}, userID:{1} newsID:{2}'.format(time.ctime(), userId, newsId))
        time.sleep(2)
        print('Task my_friends succeed at {0}'.format(time.ctime()))
        return True
    第二步:
            启动Celery Worker 服务进程,在定义完 Celery 对象后,我们可以创建对应的任务消费者:Celery worker 进程,后续的任务处理请求都是由这个 Celery worker 进程来最终执行的。
    #启动命令
    celery -A my_friends worker --loglevel=info
    User information: uid=0 euid=0 gid=0 egid=0
    
      uid=uid, euid=euid, gid=gid, egid=egid,
     
     -------------- celery@10-254-242-93 v4.2.1 (windowlicker)
    ---- **** ----- 
    --- * ***  * -- Linux-3.10.0-693.21.1.el7.x86_64-x86_64-with-centos-7.4.1708-Core 2018-09-01 16:12:02
    -- * - **** --- 
    - ** ---------- [config]
    - ** ---------- .> app:         my_friends:0x7f66fcfd7898
    - ** ---------- .> transport:   amqp://guest:**@localhost:5672//
    - ** ---------- .> results:     rpc://
    - *** --- * --- .> concurrency: 1 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** ----- 
     -------------- [queues]
                    .> celery           exchange=celery(direct) key=celery
                    
    
    [tasks]
      . my_friends.my_friends
    
    [2018-09-01 16:12:02,400: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
    [2018-09-01 16:12:02,414: INFO/MainProcess] mingle: searching for neighbors
    [2018-09-01 16:12:03,435: INFO/MainProcess] mingle: all alone
    [2018-09-01 16:12:03,448: INFO/MainProcess] celery@10-254-258-93 ready.

    启动完成后看到如上结果

    接下来用python代码调用该任务

    from my_friends import my_friends
    import time
    def notify(userId, messageId):
        # 调用任务
        result = my_friends.delay(userId, messageId)
        while not result.ready(): # 判断任务是否完成
            time.sleep(1)
            print result.get(timeout=10)  # 判断是否超时
    if __name__ == '__main__':
        notify('001', '001')

    运行情况:

    [2018-09-01 16:20:33,690: WARNING/ForkPoolWorker-1] Start to my_friends task at Sat Sep  1 16:20:33 2018, userID:001 newsID:001
    [2018-09-01 16:20:35,692: WARNING/ForkPoolWorker-1] Task my_friends succeed at Sat Sep  1 16:20:35 2018
    [2018-09-01 16:20:35,693: INFO/ForkPoolWorker-1] Task my_friends.my_friends[7209fa78-593c-4ea0-8f46-b40d025a5214] succeeded in 2.0032830736599863s: True

    继续:

      celery周期性触发任务

      我们将 select_populate_book 这个任务定义为每 10 秒执行一次。 

      config.py

    from datetime import timedelta
    CELERYBEAT_SCHEDULE = {
        # SCHEDULE名称
        'select_populate_book': {
            # 对应的任务
            'task': 'favorite_book.select_populate_book',
            # schedule周期
            'schedule': timedelta(seconds=10),
        },
    }

      创建celery 对象(favorite_book.py)

    from celery import Celery
    import time
    app = Celery('select_populate_book', backend='rpc://', broker='amqp://localhost')
    app.config_from_object('config')  # 加载配置文件
    @app.task
    def select_populate_book():
        # 定义任务的具体实现
        print('start to select_populate_book task at {0}'.format(time.ctime()))
        time.sleep(2)
        print('Task select_populate_book succeed at   {0}'.format(time.ctime()))
        return True
    # 启动celery worker
    celery -A favorite_book worker --loglevel=info
    # 我们可以启动celery beat,来根据配置文件定时的触发任务
    celery -A favorite_book beat

  • 相关阅读:
    Photon3Unity3D.dll 解析三——OperationRequest、OperationResponse
    关于VS2010的一些操作
    Photon3Unity3D.dll 解析二——EventData
    Photon3Unity3D.dll 解析一
    关于U3D中的移动和旋转
    U3D的一些常用基础脚本
    U3D模拟仿真实现
    构建基于TCP的应用层通信模型
    TCP协议的三次握手
    Python生成随机字符串
  • 原文地址:https://www.cnblogs.com/songxiaohua/p/9570772.html
Copyright © 2011-2022 走看看