zoukankan      html  css  js  c++  java
  • celery -1

    异步分布式任务队列

    语言:python

    文档:http://docs.celeryproject.org/en/master/getting-started/introduction.html

    安装:pip install  celery 以redis为broker(或rabbitmq)

    (venv) l@l:~/pycharm/py3$ celery
    usage: celery <command> [options] 
    
    Show help screen and exit.
    
    positional arguments:
      args
    
    optional arguments:
      -h, --help            show this help message and exit
      --version             show program's version number and exit
    
    Global Options:
      -A APP, --app APP
      -b BROKER, --broker BROKER
      --loader LOADER
      --config CONFIG
      --workdir WORKDIR
      --no-color, -C
      --quiet, -q
    
    ---- -- - - ---- Commands- -------------- --- ------------
    
    + Main: 
    |    celery worker
    |    celery events
    |    celery beat
    |    celery shell
    |    celery multi
    |    celery amqp
    
    + Remote Control: 
    |    celery status
     
    |    celery inspect --help
    |    celery inspect active 
    |    celery inspect active_queues 
    |    celery inspect clock 
    |    celery inspect conf [include_defaults=False]
    |    celery inspect memdump [n_samples=10]
    |    celery inspect memsample 
    |    celery inspect objgraph [object_type=Request] [num=200 [max_depth=10]]
    |    celery inspect ping 
    |    celery inspect query_task [id1 [id2 [... [idN]]]]
    |    celery inspect registered [attr1 [attr2 [... [attrN]]]]
    |    celery inspect report 
    |    celery inspect reserved 
    |    celery inspect revoked 
    |    celery inspect scheduled 
    |    celery inspect stats 
     
    |    celery control --help
    |    celery control add_consumer <queue> [exchange [type [routing_key]]]
    |    celery control autoscale [max [min]]
    |    celery control cancel_consumer <queue>
    |    celery control disable_events 
    |    celery control election 
    |    celery control enable_events 
    |    celery control heartbeat 
    |    celery control pool_grow [N=1]
    |    celery control pool_restart 
    |    celery control pool_shrink [N=1]
    |    celery control rate_limit <task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>
    |    celery control revoke [id1 [id2 [... [idN]]]]
    |    celery control shutdown 
    |    celery control terminate <signal> [id1 [id2 [... [idN]]]]
    |    celery control time_limit <task_name> <soft_secs> [hard_secs]
    
    + Utils: 
    |    celery purge
    |    celery list
    |    celery call
    |    celery result
    |    celery migrate
    |    celery graph
    |    celery upgrade
    
    + Debugging: 
    |    celery report
    |    celery logtool
    ---- -- - - --------- -- - -------------- --- ------------
    
    Type 'celery <command> --help' for help using a specific command.
    View Code

    先为redis设置密码:

    127.0.0.1:6379> CONFIG SET requirepass lyb
    (error) NOAUTH Authentication required.
    127.0.0.1:6379> exit
    l@l:~/one$ redis-cli
    127.0.0.1:6379> keys *
    (error) NOAUTH Authentication required.
    127.0.0.1:6379> ping
    (error) NOAUTH Authentication required.
    127.0.0.1:6379> auth 'lyb'
    OK
    127.0.0.1:6379> ping
    PONG
    View Code

    使用:

    # celery_1.py  这是worker,负责执行任务,需要提前制定好,如add_2
    
    from celery import Celery
    
    app = Celery('tasks',  #app名称
                 broker='redis://:lyb@localhost:6379/0',  #redis://:password@hostname:port/db_number
            backend='redis://:lyb@localhost:6379/0')   #接收结果的队列
    
    @app.task   
    def add_2(x,y):   #worker执行的任务
      print('x + y :') 
      return x+y

    启动worker,可启动多个worker,之间抢任务

    (venv) l@l:~/celerylx$ celery -A celery worker -l debug

    app.__dict__

    {
    'steps': defaultdict(<class 'set'>, {'consumer': set(), 'worker': set()}), 
    '_using_v1_reduce': None, 
    '_config_source': None, 
    'on_after_fork': <Signal: app.on_after_fork providing_args=set()>, 
    'set_as_current': True, 
    'fixups': {'celery.fixups.django:fixup'}, 
    '_preconf_set_by_auto': {'result_backend', 'broker_url'}, 
    'log': <celery.app.log.Logging object at 0x7f28a180f048>, 
    'on_after_configure': <Signal: app.on_after_configure providing_args={'source'}>, 
    'annotations': (), 
    'control_cls': 'celery.app.control:Control', 
    'namespace': None, 
    'user_options': defaultdict(<class 'set'>, {'preload': set(), 'worker': set()}), 
    'autofinalize': True, 
    'log_cls': 'celery.app.log:Logging', 
    '_pending_defaults': deque([]), 
    '_fixups': [None], 
    'backend': <celery.backends.redis.RedisBackend object at 0x7f28a14466a0>, 
    'Worker': <class 'celery.apps.worker.Worker'>, 
    'strict_typing': True, 
    '_tasks': {'celery.accumulate': <@task: celery.accumulate of tasks at 0x7f28a18d55c0>, 'celery.chord': <@task: celery.chord of tasks at 0x7f28a18d55c0>, 'celery.map': <@task: celery.map of tasks at 0x7f28a18d55c0>, 'celery.starmap': <@task: celery.starmap of tasks at 0x7f28a18d55c0>, 'celery.group': <@task: celery.group of tasks at 0x7f28a18d55c0>, 'celery.chord_unlock': <@task: celery.chord_unlock of tasks at 0x7f28a18d55c0>, 'celery.chunks': <@task: celery.chunks of tasks at 0x7f28a18d55c0>, 'celery.backend_cleanup': <@task: celery.backend_cleanup of tasks at 0x7f28a18d55c0>, 'lx.add_2': <@task: lx.add_2 of tasks at 0x7f28a18d55c0>, 'celery.chain': <@task: celery.chain of tasks at 0x7f28a18d55c0>}, 
    'clock': <LamportClock: 0>, 
    'task_cls': 'celery.app.task:Task', 
    'loader_cls': 'celery.loaders.app:AppLoader', 
    '_pending_periodic_tasks': deque([]), 
    'on_configure': <Signal: app.on_configure providing_args=set()>, 
    'finalized': True, 
    'GroupResult': <class 'celery.result.GroupResult'>, 
    '_preconf': {'result_backend': 'redis://:lyb@localhost:6379/0', 'broker_url': 'redis://:lyb@localhost:6379/0'}, 
    'registry_cls': <class 'celery.app.registry.TaskRegistry'>, 
    'amqp': <celery.app.amqp.AMQP object at 0x7f28a2772208>, 
    'on_after_finalize': <Signal: app.on_after_finalize providing_args=set()>, 
    '_finalize_mutex': <unlocked _thread.lock object at 0x7f28a1927fa8>, 
    '_pending': deque([]), 
    'main': 'tasks', 
    'loader': <celery.loaders.app.AppLoader object at 0x7f28a18e6278>, 
    'events_cls': 'celery.app.events:Events', 
    'events': <celery.app.events.Events object at 0x7f28a140f2e8>, 
    'configured': True, 
    '_conf': Settings({'include': ('lx', 'celery.app.builtins'), 'result_backend': 'redis://:lyb@localhost:6379/0', 'broker_url': 'redis://:lyb@localhost:6379/0'}, {}, {'broker_transport_options': {}, 'result_exchange': 'celeryresults', 'database_engine_options': None, 'couchbase_backend_settings': None, 'broker_password': None, 'worker_enable_remote_control': True, 'cassandra_keyspace': None, 'broker_write_url': None, 'broker_vhost': None, 'cassandra_write_consistency': None, 'broker_port': None, 'security_cert_store': None, 'broker_read_url': None, 'cache_backend': None, 'worker_hijack_root_logger': True, 'task_store_errors_even_if_ignored': False, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'cassandra_servers': None, 'beat_sync_every': 0, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'imports': (), 'task_queue_max_priority': None, 'elasticsearch_timeout': None, 'worker_pool': 'prefork', 'broker_connection_retry': True, 'worker_max_memory_per_child': None, 'task_queue_ha_policy': None, 'worker_direct': False, 'task_default_routing_key': None, 'beat_schedule_filename': 'celerybeat-schedule', 'task_acks_late': False, 'task_track_started': False, 'redis_max_connections': None, 'result_backend': 'redis://:lyb@localhost:6379/0', 'task_default_queue': 'celery', 'redis_backend_use_ssl': None, 'cassandra_auth_kwargs': None, 'cassandra_auth_provider': None, 'task_eager_propagates': False, 'task_compression': None, 'broker_use_ssl': False, 'result_persistent': None, 'worker_redirect_stdouts_level': 'WARNING', 'task_soft_time_limit': None, 'timezone': None, 'elasticsearch_retry_on_timeout': None, 'result_cache_max': -1, 'elasticsearch_max_retries': None, 'cassandra_entry_ttl': None, 'cassandra_table': None, 'database_short_lived_sessions': False, 'task_queues': None, 'task_default_rate_limit': None, 'task_publish_retry_policy': {'max_retries': 3, 'interval_max': 1, 'interval_start': 0, 'interval_step': 0.2}, 'task_create_missing_queues': True, 'worker_timer_precision': 1.0, 'broker_connection_timeout': 4, 'include': (), 'task_routes': None, 'task_always_eager': False, 'cassandra_read_consistency': None, 'control_queue_expires': 10.0, 'result_compression': None, 'beat_max_loop_interval': 0, 'worker_max_tasks_per_child': None, 'task_default_delivery_mode': 2, 'cassandra_port': None, 'task_remote_tracebacks': False, 'result_serializer': 'json', 'task_default_exchange_type': 'direct', 'result_exchange_type': 'direct', 'redis_socket_connect_timeout': None, 'control_queue_ttl': 300.0, 'database_table_names': None, 'worker_prefetch_multiplier': 4, 'broker_heartbeat': 120, 'accept_content': ['json'], 'broker_failover_strategy': None, 'event_queue_expires': 60.0, 'worker_pool_restarts': False, 'event_queue_ttl': 5.0, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'task_serializer': 'json', 'worker_concurrency': 0, 'worker_redirect_stdouts': True, 'redis_port': None, 'worker_log_color': None, 'worker_disable_rate_limits': False, 'task_annotations': None, 'event_queue_prefix': 'celeryev', 'redis_socket_timeout': 120.0, 'worker_consumer': 'celery.worker.consumer:Consumer', 'cache_backend_options': {}, 'broker_user': None, 'worker_timer': None, 'redis_password': None, 'result_expires': datetime.timedelta(1), 'task_reject_on_worker_lost': None, 'enable_utc': True, 'task_ignore_result': False, 'broker_host': None, 'worker_send_task_events': False, 'broker_connection_max_retries': 100, 'riak_backend_settings': None, 'task_default_exchange': None, 'broker_pool_limit': 10, 'database_url': None, 'worker_agent': None, 'task_publish_retry': True, 'event_serializer': 'json', 'beat_scheduler': 'celery.beat:PersistentScheduler', 'broker_url': 'redis://:lyb@localhost:6379/0', 'redis_db': None, 'security_certificate': None, 'worker_lost_wait': 10.0, 'mongodb_backend_settings': None, 'beat_schedule': {}, 'broker_transport': None, 'task_time_limit': None, 'worker_pool_putlocks': True, 'task_send_sent_event': False, 'worker_state_db': None, 'security_key': None, 'task_protocol': 2, 'broker_heartbeat_checkrate': 3.0, 'redis_host': None, 'broker_login_method': None}), 
    'Task': <class 'celery.app.task.Task'>, 
    'AsyncResult': <class 'celery.result.AsyncResult'>, 
    'control': <celery.app.control.Control object at 0x7f28a1435780>, 
    'amqp_cls': 'celery.app.amqp:AMQP', 
    'tasks': {'celery.accumulate': <@task: celery.accumulate of tasks at 0x7f28a18d55c0>, 
    'celery.chord': <@task: celery.chord of tasks at 0x7f28a18d55c0>, 
    'celery.map': <@task: celery.map of tasks at 0x7f28a18d55c0>, 
    'celery.starmap': <@task: celery.starmap of tasks at 0x7f28a18d55c0>, 
    'celery.group': <@task: celery.group of tasks at 0x7f28a18d55c0>, 
    'celery.chord_unlock': <@task: celery.chord_unlock of tasks at 0x7f28a18d55c0>, 
    'celery.chunks': <@task: celery.chunks of tasks at 0x7f28a18d55c0>, 
    'celery.backend_cleanup': <@task: celery.backend_cleanup of tasks at 0x7f28a18d55c0>, 
    'lx.add_2': <@task: lx.add_2 of tasks at 0x7f28a18d55c0>, 
    'celery.chain': <@task: celery.chain of tasks at 0x7f28a18d55c0>}}
    View Code

    用户调用接口:

    >>> import lx
    >>> a = lx.add_2.delay(4,7)  #返回任务的id
    >>> b = lx.mul.delay(6,6)
    >>> a
    <AsyncResult: 0c2b4d9e-c99d-4456-8569-2c993317fe21>
    >>> b
    <AsyncResult: 085c826e-3b26-4d3e-8f95-afd071057797>
    
    >>> a.get()  #取出结果
    11
    >>> b.get()
    36
    >>> a.ready()  #判断任务是否执行完毕

    结合djnago:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#django-first-steps

    1、在项目主应用下新建:celery.py

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
    
    app = Celery('mysite_celery')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()    #自动发现所有应用下的tasks.py
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))

    2、__init__.py中:

    from __future__ import absolute_import, unicode_literals
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
    
    __all__ = ['celery_app']

    3、在应用app01中新建 tasks.py,并定义任务

    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task  #任务作用范围不限于项目自己
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)

    4、settings.py

    #for celery
    CELERY_BROKER_URL = 'redis://:lyb@localhost:6379/0',
    CELERY_RESULT_BACKEND ='redis://:lyb@localhost:6379/0'

    5、views.py中

    from django.shortcuts import render,HttpResponse,redirect
    from . import tasks
    
    def index(request):
        t1 = tasks.add.delay(1,2)
        res = t1.get()
        return HttpResponse(res)

    6、启动worker

    (venv) l@l:~/$ celery -A mysite worker -l info

    7、启动项目,访问,就可看到结果

    改善

    视图中取task_id后取值:

    from django.shortcuts import render,HttpResponse,redirect
    from . import tasks
    from celery.result import AsyncResult
    
    def index(request):
        t1 = tasks.add.delay(1,2)
        id = t1.task_id
        
        print(AsyncResult(id=id).get())
        return HttpResponse(t1)
    渐变 --> 突变
  • 相关阅读:
    使用PLSql连接Oracle时报错ORA-12541: TNS: 无监听程序
    算法7-4:宽度优先搜索
    R语言字符串函数
    notepad++ 正则表达式
    MySQL常用命令
    linux下对符合条件的文件大小做汇总统计的简单命令
    Linux系统下统计目录及其子目录文件个数
    R: count number of distinct values in a vector
    ggplot2 demo
    R programming, In ks.test(x, y) : p-value will be approximate in the presence of ties
  • 原文地址:https://www.cnblogs.com/lybpy/p/8695777.html
Copyright © 2011-2022 走看看