zoukankan      html  css  js  c++  java
  • Celery分布式任务队列原理,用法

    提交一个耗时的请求, 立即返回ID,
    拿着这个ID去查询任务处理进程

    自己想到的方案:
    前端请求接口a,(接口a立即开始处理任务,把任务的处理到哪一步放在redis中)
    前端请求接口b,(接口b立即返回redis中进行到哪一步)

    前端先调用接口a(此时接口a阻塞)
    前端继续轮询接口b获取a进行到哪一步了
    当接口a成功返回结果,停止轮询


    Alex讲的方案

    前端请求接口a,接口立马生成一个任务id,然后启动一个进程执行这个任务,然后把id返回给前端(为什么不是线程?如果启动子线程执行任务,主线程同样会卡主,一样阻塞,如果是守护线程,主线程执行完,守护线程也结束了,所以这两种线程都不行),
    把任务执行过程写在redis/数据库/文件里 whatever,
    前端可以请求接口b,接口b拿着这个任务id去redis里找到结果并返回就OK了。(假设存在redis里)

    这个方案看似没有问题,实际上有个坑,就是不能支持高并发,为什么呢?
    Alex的原话是:你提供的这个任务管理可能有多个用户都用它,对不对?比如一个用户,执行100台并发,20个用户同时,每个人执行100台,就是2000个并发,你觉得你的这个任务服务器撑得住吗?肯定不行的,是不是?
    你现在是直接在任务服务器上调的脚本(调脚本就视为一个耗时的任务),随着并发的用户越多,机器会越来越慢,第21个用户可能都打不开网站了。这就是个坑


    一开始就不该把任务执行的模块,放到任务分发的服务器上,
    一开始就该分清楚,谁负责任务分发,谁负责任务执行。
    意思是:一台服务器a分发任务(任务按先后顺序放在队列里),后面四台服务器b,c,d,e从a上从队列里拿任务去执行

    Celery工作原理:

    自己的领悟:首先由五台机器,view是1台机器,后面有4台worker,拿Excel表入库的任务来说

    用户将Excel文件发给view,view要将文件放入队列中,4台worker从队列中拿excel文件,那么必须要在4台worker上启动某个程序

    拿到文件解析入库,成功后将信息写到队列中(是不是也可以在执行的过程中将执行的进度写入队列里?),可以和excel文件的队列一样,也可以不一样。

    Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列)

    但是`放产品`这个动作还是需要客户端来做,例如:tasks.add.delay(1,2)   或者使用beat: celery -A proj beat -l info

    celery在windows上不好使。要用linux,

    启动一台linux服务器,IP:192.168.1.10

    `pip3 install celery` 

    `yum install redis`

    定义tasks.py: 

    from celery import Celery
    import time

    # worker上能执行的任务是提前定义好的,这个任务就是用app来定义,app包括哪些任务呢,就是@app.task,broker是生产者队列,backend是结果队列

    app = Celery('tasks', broker='redis://localhost', backend='redis://localhost')

    @app.task
    def add(x, y):
    time.sleep(5)
    return x + y

    `redis-server`

    `celery -A tasks worker-l debug`    # tasks是tasks.py省略扩展名,worker是角色,-l日志级别

     重新启动一个终端:

    [root@localhost ~]# python3
    Python 3.6.8 (default, Apr 2 2020, 13:34:55)
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>> import tasks
    >>> tasks.add(3,4)
    7

    >>> t= tasks.add.delay(3,4)
    >>> t   # 返回的是AsyncResult对象
    <AsyncResult: 4ffaa775-0823-470b-b245-f3465dd921f8>

    >>> t.ready()
    False
    >>> t.ready()
    False
    >>> t.ready()
    True
    >>> t.get()
    7
    >>>t.get(timeout=1)    # 给任务设置等待时间,提高用户体验

    >>> t.get()    #如果任务出错
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 228, in get
    on_message=on_message,
    File "/usr/local/lib/python3.6/site-packages/celery/backends/asynchronous.py", line 202, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
    File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 333, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
    File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 326, in throw
    self.on_ready.throw(*args, **kwargs)
    File "/usr/local/lib/python3.6/site-packages/vine/promises.py", line 244, in throw
    reraise(type(exc), exc, tb)
    File "/usr/local/lib/python3.6/site-packages/vine/five.py", line 195, in reraise
    raise value
    NameError: name 'aa' is not defined
    >>> t.get(propagate=False)  # 出错只取结果
    NameError("name 'aa' is not defined",)
    >>> tt.traceback   # 错误结果
    Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
    NameError: name 'tt' is not defined
    >>>

    celery接收到任务:

    启动一个celery的worker,有5个进程,一个主进程3575 , 和4个工作进程,都是由主进程启动的。工作进程用来执行任务

     工作进程应该可以设置, `celery -A proj worker -l info --concurrency=4 `

    所以,执行`celery -A proj worker -l debug`命令2次,启动2个worker,就是10个celery进程,每个worker5个进程(一个主进程和四个工作进程)

    如果在项目中使用Celery?

    在项目中使用Celery,之前是配置和任务写在一起,现在把配置和任务分开

    proj/__init__.py

        /celery.py
        /tasks.py         
                  /periodic_task.py   #定时任务
     
    from __future__ import absolute_import, unicode_literals
    #导入安装的celery(所以上面要写绝对导入),而不是自己导自己(from .celery)
    from celery import Celery  
     
    app = Celery('proj',
                 broker='redis://',
                 backend='redis://',
                 include=['proj.tasks', 'proj.periodic_task'])  # 这个celery管理了哪些task文件可以有多个,其中有个定时任务periodic_task
     
    # Optional configuration, see the application user guide.
    # update方法更新配置,也可以直接写在上面初始化Celery里面
    app.conf.update(
        result_expires=3600,  # 任务结果一小时内没人取就丢弃
    )
     
    if __name__ == '__main__':
        app.start()
    celery.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app
    
    
    @app.task
    def add(x, y):
        return x + y
    
    
    @app.task
    def mul(x, y):
        return x * y
    
    
    @app.task
    def xsum(numbers):
        return sum(numbers)
    tasks.py
    from __future__ import absolute_import, unicode_literals
    from .celery import app
    from celery.schedules import crontab
     
    #该装饰器的作用是celery一连上就执行被装饰的函数
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每10秒执行test('hello')
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
     
        # 每30秒执行test('world')
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
     
        # 每周一上午7:30执行
        sender.add_periodic_task(
            crontab(hour=7, minute=30, day_of_week=1),
            test.s('Happy Mondays!'),
        )
     
    @app.task
    def test(arg):
        print(arg)
    periodic_task.py

    worker启动:`celery -A proj worker -l debug`

    发起任务: 

    from proj import  tasks,tasks1    #要cd到proj的上一层目录

    t1 = tasks.add.delay(2,4)

    t2 = tasks1.get_rand.delay(1,20)

    启动定时任务调度器beat:

    `celery -A proj.periodic_task beat -l debug`

    celery单独启动一个进程来定时发起这些任务,就是beat, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行

    上面是通过调用函数添加定时任务,也可以像写配置文件 一样的形式添加, 下面是每30s执行的任务

    app.conf.beat_schedule = {
        'add-every-30-seconds': {
            'task': 'proj.tasks.add',
            'schedule': 30.0,
            'args': (16, 16)
        },
    }
    app.conf.timezone = 'UTC'

    复杂的,例如:每周1的早上7.30执行tasks.add任务

    from celery.schedules import crontab
     
    app.conf.beat_schedule = {
        # Executes every Monday morning at 7:30 a.m.
        'add-every-monday-morning': {
            'task': 'proj.tasks.add',
            'schedule': crontab(hour=7, minute=30, day_of_week=1),
            'args': (16, 16),
        },
    }

    可以在1台服务器上启动4个worker (后台运行)

    [root@localhost ~]# celery multi start w1 -A proj -l debug
    celery multi v4.4.5 (cliffs)
    > Starting nodes...
        > w1@localhost.localdomain: OK
    [root@localhost ~]# celery multi start w2 -A proj -l debug
    celery multi v4.4.5 (cliffs)
    > Starting nodes...
        > w2@localhost.localdomain: OK
    [root@localhost ~]# celery multi start w3 -A proj -l debug
    celery multi v4.4.5 (cliffs)
    > Starting nodes...
        > w3@localhost.localdomain: OK
    [root@localhost ~]# celery multi start w4 -A proj -l debug
    celery multi v4.4.5 (cliffs)
    > Starting nodes...
        > w4@localhost.localdomain: OK
    [root@localhost ~]# 

    可以在多台服务器上运行worker,只需要修改redis配置就行。他们都从同一个redis领任务

    重启 w1 w2 :`celery multi restart w1 w2 -A proj`

    停止w1 w2:`celery multi stop w1 w2 -A proj`

  • 相关阅读:
    机器学习基础1--线性回归
    联想RD350板载RAID110i,安装CentOS 7 不识别RAID设备
    CentOS 7磁盘寻找不到,卡在sulogin,造成的开机失败问题--Error getting authority...
    gitlab同步插件gitlab-mirrors报错<已解决,未找到原因>
    马哥Linux SysAdmin学习笔记(四)
    马哥Linux SysAdmin学习笔记(三)
    马哥Linux SysAdmin学习笔记(二)
    马哥Linux SysAdmin学习笔记(一)
    php-round()四舍六入
    Linux权限问题(2)-unzip引发的权限问题
  • 原文地址:https://www.cnblogs.com/staff/p/13099934.html
Copyright © 2011-2022 走看看