zoukankan      html  css  js  c++  java
  • celery实际应用例子

    Celery例子

    目录

     setting

    '''
    celery配置
    '''
    task_acks_late = True
    worker_prefetch_multiplier = 1
    # 限制最大使用内存,限制celery执行10个任务,就销毁重建
    worker_max_memory_per_child = 150000
    task_reject_on_worker_lost = True
    broker_pool_limit = 300
    timezone = "Asia/Shanghai"
    broker_url = 'amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0'
    
    # 优先级参数必须加
    celery_acks_late = True
    celeryd_prefetch_multiplier = 1

    task文件

    import time
    from celery import Celery
    from celery1 import celery_setting
    from kombu import Exchange, Queue

    app = Celery('celery1.my_task')
    app.config_from_object(celery_setting)
    app.conf.update(
    broker_url="amqp://guest:guest@localhost:5672/{vhost}?heartbeat=0".format(vhost="test")
    )

    # 1) x-max-length 提供一个非负整数值来设置最大消息条数。
    # 2) x-max-length-bytes 提供一个非负整数值,设置最大字节长度。如果设置了两个参数,那么两个参数都将适用;无论先达到哪个限制,都将强制执行。
    # 3) x-overflow 提供字符串值来设置。可能的值是:
    # drop-head (默认值):从队列前面丢弃或 dead-letter 消息,保存后n条消息
    # reject-publish:最近发布的消息将被丢弃,即保存前n条消息。
    # 4) x-max-priority 提供一个非负整数值来设置最大优先级。

    app.conf.task_queues = [
    Queue('priority_test_1', Exchange('default', type='direct'), routing_key='default', durable=False,
    queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
    "x-overflow": "drop-head", "durable": False}),
    Queue('priority_test_2', Exchange('default', type='direct'), routing_key='default', durable=False,
    queue_arguments={'x-max-priority': 10, "x-max-length": 10, "x-max-length-bytes": 1000,
    "x-overflow": "drop-head"}),

    ]


    @app.task(bind=True,
    queue='priority_test_1', # 指定队列名
    max_retries=10, # 最大重试
    default_retry_delay=600, # 重试间隔时间
    autoretry_for=(TypeError, KeyError) # 指定重试错误
    )
    def priority_test_1(self, data):
    # print(self)
    print(data)


    # celery worker -A celery1.my_task -n priority_test_1 -Q priority_test_1 -c 1 -l info -P gevent

    @app.task(bind=True,
    queue='priority_test_2',
    )
    def priority_test_2(self, data):
    try:
    print(data["1"])
    time.sleep(2)
    except (TypeError, KeyError) as exc:
    raise self.retry(exc=exc, countdown=60 * 5, max_retries=5)

    # celery worker -A celery1.my_task -n priority_test_2 -Q priority_test_2 -c 1 -l info -P gevent

    添加任务文件

    from Test.celery1.my_task import priority_test_1, priority_test_2

    for i in range(10):
    print(i)
    priority_test_1.s({"name": f"{j}"}).apply_async(priority=i)
    priority_test_2.s({"name": f"{j}"}).apply_async(priority=i)
  • 相关阅读:
    VisualVM 分析full GC问题记录
    HTTPS协议、TLS协议、证书认证过程解析
    java.lang基础数据类型boolean、char、byte、short、int、long、float、double (JDK1.8)
    java.lang.StringBuilder和java.lang.StringBuffer (JDK1.8)
    MVC中自带的异步((Ajax.BeginForm)无效
    百度富文本编辑器UEDITOR
    只有在配置文件或 Page 指令中将 enableSessionState 设置为 true 时,才能使用会话状态。还请确保在应用程序配置的 // 节中包括 System.Web.SessionSta
    【知识碎片】CSS 篇
    js 将json字符串转换为json对象的方法解析
    【知识碎片】Asp.Net 篇
  • 原文地址:https://www.cnblogs.com/clbao/p/12877990.html
Copyright © 2011-2022 走看看