zoukankan      html  css  js  c++  java
  • Celery 服务搭建

    整个项目工程如下

    __init__.py

    """
    注意点:python3.7 需要执行 pip install --upgrade https://github.com/celery/celery/tarball/master
    否则会报 from . import async, base
    
    SyntaxError: invalid syntax
    
    celery -A __init__  worker --concurrency=5 -l INFO -Q celery,save_redis
    celery -A __init__ worker -l info -Q save_mongo
    cd  /Users/admin/PycharmProjects/function_test/adminSyS/mq&&celery -A __init__  worker --concurrency=5
    
    
    celery 启动  --autoscale=10,3 当worker不足时自动加3-10个
    celery -A __init__  worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3
    
    supervisor 配置
    [program:celery]
    directory = /data/app/adminSyS/mq
    command = /data/app/adminSyS/venv/bin/celery -A __init__  worker --concurrency=5 -l INFO -Q celery,save_redis
    autostart = true
    autorestart = true
    startsecs = 5
    startretries = 3
    
    监控:
        文档 https://flower-docs-cn.readthedocs.io/zh/latest/install.html
        pip install flower
     celery flower --broker=redis://:z1234567@47.93.235.228:6379/5 --port=5555
     测试 http://47.93.235.228:9114
    """
    import os
    import sys
    sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
    from celery import Celery
    
    app = Celery(include=["mq.tasks"])
    app.config_from_object("celeryconfig")  # 指定配置文件
    View Code

    celeryconfig.py

    import os
    import sys
    
    sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
    from kombu import Queue, Exchange
    
    BROKER_URL = "redis://:redis@127.0.0.1:6379/5"  # 消息代理
    CELERY_RESULT_BACKEND = "redis://:redis@127.0.0.1:6379/6"  # 任务执行结果存放地址
    CELERY_TASK_SERIALIZER = "json"  # 任务序列化和翻序列化的方案
    CELERY_RESULT_SERIALIZER = "json"  # 读取任务结果序列化方式
    CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任务过期时间
    CELERY_ACCEPT_CONTENT = ["json", "msgpack"]  # 接受的任务类型
    BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 86400}   # 任务重发时间
    CELERYD_MAX_TASKS_PER_CHILD = 400  # 每个worker执行了多少任务就会自动重启防止内存泄漏
    
    # 指定任务队列  save_redis save_redis2
    CELERY_QUEUES = (
        Queue("default", Exchange("default"), routing_key="default"),
        Queue("save_redis", Exchange("save_redis"), routing_key="save_redis"),
        Queue("save_redis2", Exchange("save_redis2"), routing_key="save_redis2")
    )
    
    # mq.tasks.taskA  mq.tasks.taskB  具体函数名
    CELERY_ROUTES = {
        'mq.tasks.taskA': {"queue": "save_redis", "routing_key": "save_redis"},
        'mq.tasks.taskB': {"queue": "save_redis2", "routing_key": "save_redis2"}
    }
    View Code

    tasks.py

    from mq import app
    import redis
    import time
    
    rds = redis.Redis(
        host="localhost",
        port=6379,
        db=7,
        password="redis")
    
    
    # 客户端
    
    @app.task
    def taskA(x, y):
        time.sleep(10)
        rds.setex(name="taskA", value=x + y, time=3600)
        return x + y
    
    
    @app.task
    def taskB(x, y, c):
        time.sleep(10)
        rds.setex(name="taskB{}".format(c), value=x + y, time=3600)
        return x + y
    View Code

    test.py

    import sys
    import os
    
    sys.path.append(os.path.abspath(os.path.dirname(__file__)) + "/..")
    from mq.tasks import taskA, taskB
    
    #re1 = taskA.delay(100, 200)
    # print(re1)
    # print(re1.status)
    
    # 服务端
    
    for i in range(10):
        re2 = taskB.delay(100, i, i)
        print(re2)
        print(re2.status)
    View Code

    celery客户端启动  save_redis2,save_redis 代表函数名    --autoscale=10,3 当work不够用时自动起3-10个进程

    celery -A __init__  worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3

     #启动方式

     ../venv/bin/celery -A __init__ worker --concurrency=5 -l INFO -Q celery,save_redis2,save_redis --autoscale=10,3



    # 启动方式打印log
    command=/home/op/saiyan_game_center/venv/bin/celery -A __init__ worker --concurrency=8 -l INFO -Q upload_box_task --autoscale=5,3 --logfile=/home/op/saiyan_game_center/logs/log.log

    celery web监控

    文档:https://flower-docs-cn.readthedocs.io/zh/latest/install.html

    安装:pip install flower

    启动:celery flower --broker=代理url --port=5555

    celery flower --broker=redis://:redis@127.0.0.1:6379/5 --port=5555


     

  • 相关阅读:
    go语言关于线程与通道channal
    linux 搭建SVN服务端
    使用mbedtls的使用说明和AES加密方法(原来的PolarSSL)
    清理 Xcode 10 记录
    Windows下修改iTunes备份路径
    Winform窗口自适应
    修改类模板文件
    HashTable
    修改App.config的键和值
    博客园动画效果
  • 原文地址:https://www.cnblogs.com/zhaoyingjie/p/11382770.html
Copyright © 2011-2022 走看看