zoukankan      html  css  js  c++  java
  • Celery 大量任务 分发

    Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,它不仅支持实时处理也支持任务调度。

    • user:用户程序,用于告知celery去执行一个任务。
    • broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
    • worker:执行任务

    celery需要rabbitMQ、Redis、Amazon SQS、Zookeeper(测试中) 充当broker来进行消息的接收,并且也支持多个broker和worker来实现高可用和分布式。http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html

        Celery version 4.0 runs on
            Python ❨2.7, 3.4, 3.5❩
            PyPy ❨5.4, 5.5❩
        This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.
    
        If you’re running an older version of Python, you need to be running an older version of Celery:
    
            Python 2.6: Celery series 3.1 or earlier.
            Python 2.5: Celery series 3.0 or earlier.
            Python 2.4 was Celery series 2.2 or earlier.
    
        Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.
    版本和要求

    环境准备:

    • 安装rabbitMQ或Redis
          见:http://www.cnblogs.com/wupeiqi/articles/5132791.html
    • 安装celery
           pip3 install celery

    快速上手

    import time
    from celery import Celery
    
    app = Celery('tasks', broker='redis://192.168.10.48:6379', backend='redis://192.168.10.48:6379')
    
    
    @app.task
    def xxxxxx(x, y):
        time.sleep(10)
        return x + y
    s1.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from s1 import xxxxxx
    
    # 立即告知celery去执行xxxxxx任务,并传入两个参数
    result = xxxxxx.delay(4, 4)
    print(result.id)
    s2.py
    from celery.result import AsyncResult
    from s1 import app
    
    async = AsyncResult(id="f0b41e83-99cf-469f-9eff-74c8dd600002", app=app)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    s3.py

    执行 s1.py 创建worker(终端执行命令):

      注释: 要在项目目录里执行《 在windows是不支持这个命令得  要安装 pip3 install eventle》在执行得时候 

    celery worker -A s1 -l info -P eventlet   # 在windows 下执行的命令
    celery worker -A s1 -l info
    

    执行 s2.py ,创建一个任务并获取任务ID:

    python3 s2.py 

    执行 s3.py ,检查任务状态并获取结果:

    python3 s3.py

    多任务结构

    pro_cel
        ├── celery_tasks# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件
        │   └── tasks.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    
    celery = Celery('xxxxxx',
                    broker='redis://192.168.0.111:6379',
                    backend='redis://192.168.0.111:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False
    pro_cel/celery_tasks/celery
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def xxxxx(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    pro_cel/celery_tasks/tasks.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from celery.result import AsyncResult
    from celery_tasks.celery import celery
    
    async = AsyncResult(id="ed88fa52-11ea-4873-b883-b6e0f00f3ef3", app=celery)
    
    if async.successful():
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')
    pro_cel/check_result.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import celery_tasks.tasks
    
    # 立即告知celery去执行xxxxxx任务,并传入两个参数
    result = celery_tasks.tasks.xxxxx.delay(4, 4)
    
    print(result.id)
    pro_cel/send_task.py

    更多配置:http://docs.celeryproject.org/en/latest/userguide/configuration.html

    定时任务

    1. 设定时间让celery执行一个任务

    import datetime
    from celery_tasks.tasks import xxxxx
    """
    from datetime import datetime
    
    v1 = datetime(2017, 4, 11, 3, 0, 0)
    print(v1)
    
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    
    """
    ctime = datetime.datetime.now()
    utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp())
    
    s10 = datetime.timedelta(seconds=10)
    ctime_x = utc_ctime + s10
    
    # 使用apply_async并设定时间
    result = xxxxx.apply_async(args=[1, 3], eta=ctime_x)
    print(result.id)
    

    2. 类似于contab的定时任务

    """
    celery beat -A proj
    celery worker -A proj -l info
    
    """
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='amqp://47.98.134.86:5672', backend='amqp://47.98.134.86:5672', include=['proj.s1', ])
    app.conf.timezone = 'Asia/Shanghai'
    app.conf.enable_utc = False
    
    app.conf.beat_schedule = {
        # 'add-every-10-seconds': {
        #     'task': 'proj.s1.add1',
        #     'schedule': 10.0,
        #     'args': (16, 16)
        # },
        'add-every-12-seconds': {
            'task': 'proj.s1.add1',
            'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
            'args': (16, 16)
        },
    }

    注:如果想要定时执行类似于crontab的任务,需要定制 Scheduler来完成。

    Flask中应用Celery

    pro_flask_celery/
    ├── app.py
    ├── celery_tasks
        ├── celery.py   # 必须得有一个 celery.py的文件  这里放连接
        └── tasks.py
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from flask import Flask
    from celery.result import AsyncResult
    
    from celery_tasks import tasks
    from celery_tasks.celery import celery
    
    app = Flask(__name__)
    
    TASK_ID = None
    
    
    @app.route('/')
    def index():
        global TASK_ID
        result = tasks.xxxxx.delay()
        # result = tasks.task.apply_async(args=[1, 3], eta=datetime(2018, 5, 19, 1, 24, 0))
        TASK_ID = result.id
    
        return "任务已经提交"
    
    
    @app.route('/result')
    def result():
        global TASK_ID
        result = AsyncResult(id=TASK_ID, app=celery)
        if result.ready():
            return result.get()
        return "xxxx"
    
    
    if __name__ == '__main__':
        app.run()
    app.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from celery import Celery
    from celery.schedules import crontab
    
    celery = Celery('xxxxxx',
                    broker='redis://192.168.10.48:6379',
                    backend='redis://192.168.10.48:6379',
                    include=['celery_tasks.tasks'])
    
    # 时区
    celery.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    celery.conf.enable_utc = False
    celery_tasks/celery.py
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import time
    from .celery import celery
    
    
    @celery.task
    def hello(*args, **kwargs):
        print('执行hello')
        return "hello"
    
    
    @celery.task
    def xxxxx(*args, **kwargs):
        print('执行xxxxx')
        return "xxxxx"
    
    
    @celery.task
    def hhhhhh(*args, **kwargs):
        time.sleep(5)
        return "任务结果"
    celery_task/tasks.py

    春生Flask中应用Celery

    from flask import Flask,render_template,request,redirect
    import time
    from celery_tasks import tasks
    from celery.result import AsyncResult
    from celery_tasks.celery import cel
    
    
    app = Flask(__name__)
    
    
    GOODS = [
        # {'title':'商品名称','pirce':100,'ticket':'7ec48f84-7160-4c1d-bb78-9c9327f7a978'}
    ]
    
    @app.route('/index')
    def index():
        return render_template('index.html',goods = GOODS)
    
    @app.route('/add',methods=['GET','POST'])
    def add():
        if request.method == "GET":
            return render_template('add.html',goods = GOODS)
        title = request.form.get('title')
        price = request.form.get('price')
        # 处理业务逻辑
        # 耗时 1分钟
    
    
        # 立即交给broker去执行
        result = tasks.x1.delay(1,8)  # 去触发 函数  result.id 拿到一个 字符串凭证
    
        # 10s之后,broker才开始执行
        import datetime
        # 可以 t = "2018-8-8"
        ctime = datetime.datetime.now() # 获取当前时间
        utc_ctime = datetime.datetime.utcfromtimestamp(ctime.timestamp()) # 当前时间转换成UTC时间
        ctime_x = utc_ctime + datetime.timedelta(seconds=10)   # 时间  utc时间  seconds=10 ----就是当前时间的10秒后执行
    
        result = tasks.x1.apply_async(args=[1, 8], eta=ctime_x)   # apply_async 是
    
    
        GOODS.append({'title':title,'price':price,'ticket':result.id})
    
        return redirect('/index')
    
    @app.route('/detail')
    def detail():
        ticket = request.args.get('ticket')
        result = AsyncResult(id=ticket, app=cel)
        if result.successful():
            val = result.get()
            return "执行完成,结果:%s" %val
        else:
            return '正在处理中...'
    
    
    if __name__ == '__main__':
        app.run()
    app.py
    from celery import Celery
    from celery.schedules import crontab
    
    cel = Celery('tasks',  # 是一个名字
                 broker='redis://:beta@140.143.227.206:8888/0',  #  放任务
                 backend='redis://:beta@140.143.227.206:8888/0',  # 取结果
                include=['celery_tasks.tasks','celery_tasks.xxx']
                 )
    # 如果需要 每天都要执行的 任务之前 要 执行这个 celery beat -A celery_tasks
    cel.conf.beat_schedule = {
        # 'add-every-10-seconds': {
        #     'task': 'celery_tasks.tasks.x2',  # 找到 那个函数
        #     'args': (98, 10),  # 给 x2 传参数
        #     'schedule': 10.0,      # 每10秒执行下这个任务
        # },
        'add-every-12-seconds': {
            'task': 'celery_tasks.tasks.x2',
            'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),  #  month_of_year = 月  day_of_month = 日 hour = 时 minute = 分
            'args': (26, 16)
        },
    }
    celery.py
    import time
    from .celery import cel
    from celery import shared_task
    
    
    @shared_task
    def x1(x, y):
        time.sleep(10)
        return x + y
    
    
    @cel.task
    def x2(x, y):
        time.sleep(5)
        return x - y
    
    
    @cel.task
    def x3(x, y):
        time.sleep(2)
        return x * y
    tasks.py
  • 相关阅读:
    android自动登录
    【199】ArcGIS 添加自定义工具到工具箱
    【198】Synergy
    【197】PowerShell 通过 FTP 下载文件
    【196】Dell 移动工作站系统安装方法
    php如何同时连接多个数据库
    FreeRTOS学习笔记——任务间使用队列同步数据
    牛腩新闻发布系统之发布
    Linux散列表(二)——宏
    Excel导入数据库(三)——SqlBulkCopy
  • 原文地址:https://www.cnblogs.com/jiangchunsheng/p/9274735.html
Copyright © 2011-2022 走看看