zoukankan      html  css  js  c++  java
  • celery实际应用例子

    Celery例子

    目录

     setting

    '''
    celery配置
    '''
    task_acks_late = True
    worker_prefetch_multiplier = 1
    # 限制最大使用内存,限制celery执行10个任务,就销毁重建
    worker_max_memory_per_child = 150000
    task_reject_on_worker_lost = True
    broker_pool_limit = 300
    timezone = "Asia/Shanghai"
    broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0'
    
    # 优先级参数必须加
    celery_acks_late = True
    celeryd_prefetch_multiplier = 1

    task文件

    import time
    from celery import Celery
    from celery1 import celery_setting
    from kombu import Exchange, Queue

    app = Celery('celery1.my_task')
    app.config_from_object(celery_setting)
    app.conf.update(
    broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
    )

    # 1) x-max-length 提供一个非负整数值来设置最大消息条数。
    # 2) x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
    # 3) x-overflow 提供字符串值来设置。可能的值是:
    # drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
    # reject-publish:最近发布的消息将被丢弃,即保存前n条消息。
    # 4) x-max-priority 提供一个非负整数值来设置最大优先级。

    app.conf.task_queues = [
    Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default', durable=False,
    queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
    "x-overflow": "drop-head", "durable": False}),
    Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default', durable=False,
    queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
    "x-overflow": "drop-head"}),

    ]


    @app.task(bind=True,
    queue='priority_test_1', # 指定队列名
    max_retries=10, # 最大重试
    default_retry_delay=600, # 重试间隔时间
    autoretry_for=(TypeError, KeyError) # 指定重试错误
    )
    def priority_test_1(self, data):
    # print(self)
    print(data)


    # celery worker -A celery1.my_task -n priority_test_1 -Q priority_test_1 -c 1 -l info -P gevent

    @app.task(bind=True,
    queue='priority_test_2',
    )
    def priority_test_2(self, data):
    try:
    print(data["1"])
    time.sleep(2)
    except (TypeError, KeyError) as exc:
    raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)

    # celery worker -A celery1.my_task -n priority_test_2 -Q priority_test_2 -c 1 -l info -P gevent

    添加任务文件

    from Test.celery1.my_task import priority_test_1, priority_test_2

    for i in range(10):
    print(i)
    priority_test_1.s({"name": f"{j}"}).apply_async(priority=i)
    priority_test_2.s({"name": f"{j}"}).apply_async(priority=i)
  • 相关阅读:
    effective c++ 笔记 (30-31)
    设计模式 笔记 装饰模式 Decorator
    设计模式 笔记 组合模式 Composite
    设计模式 笔记 外观模式 Facade
    effective c++ 笔记 (26-29)
    设计模式 笔记 桥接模式 Bridge
    设计模式 笔记 适配器模式 Adapter
    算法导论 第二章作业
    设计模式 创建型模式实践
    设计模式 笔记 单例模式 Singleton
  • 原文地址:https://www.cnblogs.com/clbao/p/12877990.html
Copyright © 2011-2022 走看看