zoukankan      html  css  js  c++  java
  • Celery

    什么是Clelery

    Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统

    专注于实时处理的异步任务队列

    同时也支持任务调度

    Celery架构

    Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

    消息中间件

    Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等

    任务执行单元

    Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中

    任务结果存储

    Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等

    使用场景

    异步任务:将耗时操作任务提交给Celeryu去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

    定时任务:定时执行某件事情,比如每天数据统计

    Celery的安装配置

    pip install celery

    消息中间件:RabbitMQ/Redis

    app=Celery('任务名',backend='xxx',broker='xxx')

    Celery执行异步任务

    创建py文件:celery_app_task.py

    from celery import Celery
    # broker='redis://127.0.0.1:6379/2' 不加密码
    backend='redis://:123456@127.0.0.1:6379/0' #最后的0是指定的库
    broker='redis://:123456@127.0.0.1:6379/1'
    app = Celery('test',broker=broker,backend=backend) #一定要指定一个名字,每次实例化都要起一个名字
    
    #任务其实是一个函数
    #需要用到一个装饰器,表示该任务是被celery管理的,并且可以用celery执行的
    @app.task()
    def add(x,y):
        import time
        time.sleep(2)
        return x+y

    现在需要把任务提交到消息队列里,是另一个程序需要新建一个add_task.py

    #提交任务到消息队列中
    #只是把任务提交到消息队列中,并没有执行
    ret = celery_task_s1.add.delay(4,5)
    print(ret)
    #c94054bb-9aac-425f-a4e2-9b315b2a2fe1返回了字符串,这就是任务的标识号,那这个去数据库查

    创建py文件:run.py,执行任务,或者使用命令执行:celery worker -A celery_app_task -l info

    注:windows下:celery worker -A celery_app_task -l info -P eventlet

    在命令行输入命令

    然后数据库就有结果了

    然后查看任务执行结果创建celery_result.py

    from celery.result import AsyncResult
    from celery_task_s1 import app
    
    async = AsyncResult(id="c94054bb-9aac-425f-a4e2-9b315b2a2fe1", app=app)
    
    if async.successful():
        #取出return值
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    这就是分布式

    总结流程

    -1 指定broker(消息中间件),指定backend(结果存储)
    -2 实例化产生一个Celery对象 app=Celery('名字',broker,backend)
    -3 加装饰器绑定任务,在函数(add)上加装饰器app.task
    -4 其他程序提交任务,先导入add,add.delay(参,参数),会将该函数提交到消息中间件,但是并不会执行,有个返回值,直接print会打印出任务的id,以后用id去查询任务是否执行完成
    -5 启动worker去执行任务:celery worker -A celery_task_s1 -l info   windows下:celery worker -A celery_task_s1 -l info -P eventlet
    -6 查看结果:根据id去查询   

    多任务结构

    pro_cel
        ├── celery_task# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
        │   └── tasks1.py    #  所有任务函数
        │    └── tasks2.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务

    celery.py

    from celery import Celery
    
    backend='redis://:123456@127.0.0.1:6379/1' #最后的0是指定的库
    broker='redis://:123456@127.0.0.1:6379/0'
    app = Celery('test',broker=broker,backend=backend,
                 include=['celery_task.order_task',
                          'celery_task.user_task'
                          ])
    #一定要指定一个名字,每次实例化都要起一个名字
    # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
    #时区
    # app.conf.timezone = 'Asia/Shanghai'
    #是否使用UTC
    # app.conf.enable_utf = False

    order_task.py

    from celery_task.celery import app
    
    import time
    @app.task
    def order_add(x,y):
        time.sleep(1)
        return x+y

    user_task.py

    from celery_task.celery import app
    
    import time
    @app.task
    def user_add(x,y):
        time.sleep(1)
        return x+y

    add_task.py

    from celery_task.order_task import order_add
    from celery_task.user_task import user_add
    
    ret = order_add.delay(5,6)
    print(ret)
    ret2 =user_add.delay(10,6)
    print(ret2)

    先执行add_task拿到唯一id存到redis里的backend结果存储然后再到命令行执行

    -启动worker,celery_task是包的名字
    celery worker -A celery_task -l info -P eventle

    查询结果celery_result.py

    from celery.result import AsyncResult
    from celery_task.celery import app
    
    async = AsyncResult(id="48da55ab-6109-4913-8f43-0d6a6ad4a174", app=app)
    
    if async.successful():
        #取出return值
        result = async.get()
        print(result)
        # result.forget() # 将结果删除
    elif async.failed():
        print('执行失败')
    elif async.status == 'PENDING':
        print('任务等待中被执行')
    elif async.status == 'RETRY':
        print('任务异常后正在重试')
    elif async.status == 'STARTED':
        print('任务已经开始被执行')

    Celery执行定时任务

    设定时间让celery执行一个任务

    比如几点几分几秒执行某个任务,添加任务的时候

    方式一:

    v1 = datetime(2019, 7, 13, 9, 48, 56)
    print(v1)
    v2 = datetime.utcfromtimestamp(v1.timestamp())
    print(v2)
    #取出我要执行任务的时间对象,调用apply_async方法,args是参数,eta是执行的时间
    result = celery_task_s1.add.apply_async(args=[1, 3], eta=v2)
    print(result.id)

    方式二:第二种取时间的方法,获取当前之间,定时执行

    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    #取10秒之后的时间对象
    time_delay = timedelta(seconds=3)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间
    result = celery_task_s1.add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)

     类似于contab的定时任务

    from celery import Celery
    from datetime import timedelta
    from celery.schedules import crontab
    backend='redis://:123456@127.0.0.1:6379/1' #最后的0是指定的库
    broker='redis://:123456@127.0.0.1:6379/0'
    app = Celery('test',broker=broker,backend=backend,
                 include=['celery_task.order_task',
                          'celery_task.user_task'
                          ])
    #一定要指定一个名字,每次实例化都要起一个名字
    # 包含以下两个任务文件,去相应的py文件中找任务,对多个任务做分类
    #时区
    app.conf.timezone = 'Asia/Shanghai'
    #是否使用UTC
    app.conf.enable_utf = False
    -----******-----
    app.conf.beat_schedule = {
        # 名字随意命名
        'add-every-10-seconds': {
            # 执行tasks1下的test_celery函数
            'task': 'celery_task.order_task.order_add',
            # 每隔2秒执行一次
            # 'schedule': 1.0,
            # 'schedule': crontab(minute="*/1"),
            'schedule': timedelta(seconds=2),
            # 传递参数
            'args': (5,6)
        },
        # 'add-every-12-seconds': {
        #     'task': 'celery_task.order_task.order_add',
        #     # 每年4月11号,8点42分执行
        #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
        #     'args': (16, 16)
        # },
    }

    启动一个beat:celery beat -A celery_task -l info

    启动work执行:celery worker -A celery_task -l info -P eventlet

    Django中使用Celery

    安装包

    celery==3.1.25
    django-celery==3.1.20

    由于django-celery不好用所以直接把多任务结构中的celery_task文件复制到django目录下

    -******在celery的任务函数中不能直接调用django的环境,需要手动添加
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "untitled15.settings")
    import django
    django.setup()

    很少有人使用django的celery

  • 相关阅读:
    安装JDK,如何配置PATH,如何配置CLASSPATH
    存储过程笔记
    用BeanFactoryAware接口,用BEAN的名称来获取BEAN对象
    静态工厂实例代码
    setTimeout 和 setInterval 的区别
    Spring Autowire自动装配
    动态工厂模式代码实例
    JS处理回车事件
    不错的Spring学习笔记(转)
    单例模式要点
  • 原文地址:https://www.cnblogs.com/zhengyuli/p/11178747.html
Copyright © 2011-2022 走看看