zoukankan      html  css  js  c++  java
  • celery框架

    celery框架:

    介绍:

        Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
    
      Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
    
    	celery 组成: broker   |   worker    |   backend
    
        消息中间件:Celery本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis等等。
    
      任务执行单元:Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。
    
      任务结果存储:Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis等。
    
        
    使用场景:
    	Celery多用来执行异步任务,将耗时的操作交由Celery去异步执行,比如发送邮件、短信、消息推送、音视频处理等。
        还可以执行定时任务,定时执行某件事情,比如Redis中的数据每天凌晨两点保存至mysql数据库,实现Redis的持久化。
        延时任务:解决延迟任务
        
    
    注意: RabbitMQ : 异步的消息队列 (线上使用)
    

    celery + redis :

    环境搭建:

    配置:
    	pip install celery
        pip install django-redis
        # Windows中还需要安装以下模块,用于任务执行单元
        pip install eventlet
        
    redis-- settings.py:
        CACHES = {
        "default": {
            "BACKEND": "django_redis.cache.RedisCache",
            "LOCATION": "redis://127.0.0.1:6379",
            "OPTIONS": {
                "CLIENT_CLASS": "django_redis.client.DefaultClient",
                "CONNECTION_POOL_KWARGS": {"max_connections": 100}
                # "PASSWORD": "123",
            }
        }
    }
    
    
    

    任务结构:

    创建文件目录结构:
    	pro_cel
        ├── celery_task# celery相关文件夹
        │   ├── celery.py   # celery连接和配置相关文件,必须叫这个名字
        │   └── tasks1.py    #  所有任务函数
        │   └── tasks2.py    #  所有任务函数
        ├── check_result.py # 检查结果
        └── send_task.py    # 触发任务
        
     注意,检查结果与触发任务的模块不能写在celery_task模块中,不然会报导入celery的错误。
    
    celery框架工作流程
        1)创建Celery框架对象app,配置broker和backend,得到的app就是worker
        2)给worker对应的app添加可处理的任务函数,用include配置给worker的app
        3)完成提供的任务的定时配置app.conf.beat_schedule
        4)启动celery服务,运行worker,执行任务
        5)启动beat服务,运行beat,添加任务
    

    任务实现:

    基本使用:
    	1. 创建环境: celery 环境搭建
    	2.创建app  + 任务
        	(创建: broker  + app  + backend)
    	3.执行任务:
        	# 非windows
            celery worker -A celery_task -l info
            # windows:
            pip3 install eventlet
            celery worker -A celery_task -l info -P eventlet
       4.添加任务:手动添加,要自定义添加任务的脚本,右键执行脚本
    
       5.获取结果:手动获取,要自定义获取任务的脚本,右键执行脚本
    
    eg:
        from celery import Celery
        broker = 'redis://127.0.0.1:6379/1'
        backend = 'redis://127.0.0.1:6379/2'
        app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
        
    task.py:
        from .celery import app
        @app.task
        def add(n, m)
        	pass
    
    

    定时任务

    celery.py:

    from celery import Celery
    from celery.schedules import crontab
    from datetime import timedelta
    
    # 消息中间件,密码是你redis的密码
    # broker='redis://:123456@127.0.0.1:6379/2' 密码123456
    broker = 'redis://127.0.0.1:6379/0'  # 无密码
    # 任务结果存储
    backend = 'redis://127.0.0.1:6379/1'
    
    # 生成celery对象,'task'相当于key,用于区分celery对象(任意名)
    # include参数需要指定任务模块
    app = Celery('task', broker=broker, backend=backend, include=[
        'celery_task.add_task',
        'celery_task.send_email'
    ])
    
    # 时区 (可设置任意一个)
    app.conf.timezone = 'Asia/Shanghai'
    # 是否使用UTC
    app.conf.enable_utc = False
    
    # 定时执行
    app.conf.beat_schedule = {
        # 名字随意命名
        'add-every-5-seconds': {
            # 执行add_task下的addy函数
            'task': 'celery_task.add_task.add',
            # 每10秒执行一次
            'schedule': timedelta(seconds=10),
            # add函数传递的参数
            'args': (1, 2)
        },
        'add-every-10-seconds': {
            'task': 'celery_task.add_task.add',
            # crontab不传的参数默认就是每的意思,比如这里是每年每月每日每天每小时的5分执行该任务
            'schedule': crontab(minute=5),
            'args': (1, 2)
        }
    }
    

    send_msg.py:

    #项目配置
    
    # EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
    EMAIL_HOST = 'smtp.qq.com'  # 如果是 163 改成 smtp.163.com
    EMAIL_PORT = 465
    EMAIL_HOST_USER = '1504703554@qq.com'  # 发送邮件的邮箱帐号
    EMAIL_HOST_PASSWORD = '授权码'  # 授权码,各邮箱的设置中启用smtp服务时获取
    DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
    # 这样收到的邮件,收件人处就会这样显示
    # DEFAULT_FROM_EMAIL = '2333<'1504703554@qq.com>'
    EMAIL_USE_SSL = True   # 使用ssl
    # EMAIL_USE_TLS = False # 使用tls
    # EMAIL_USE_SSL 和 EMAIL_USE_TLS 是互斥的,即只能有一个为 True
    
    
    import os
    if __name__ == "celery_task.send_email":
        # 因为需要用到django中的内容,所以需要配置django环境
        os.environ.setdefault("DJANGO_SETTINGS_MODULE", "do_celery.settings")
        import django
        django.setup()
        # 导入celery对象app
        from celery_task.celery import app
        from app01 import models
        # 导入django自带的发送邮件模块
        from django.core.mail import send_mail
        import threading
    
    
    	@app.task
        def send_email1(id):  # 此时可以直接传邮箱,还能减少一次数据库的IO操作
            # 此处的id由用户注册的视图函数中传入
            user_obj = models.UserInfo.objects.filter(pk=id).first()
            email = user_obj.email
            # 启用线程发送邮件,此处最好加线程池
            t = threading.Thread(target=send_mail, args=(
                "激活邮件,点击激活账号",  # 邮件标题
                '点击该邮件激活你的账号,否则无法登陆',  # 给html_message参数传值后,该参数信息失效
                settings.EMAIL_HOST_USER,  # 用于发送邮件的邮箱地址
                [email],  # 接收邮件的邮件地址,可以写多个
                ),
                # html_message中定义的字符串即HTML格式的信息,可以在一个html文件中写好复制出来放在该字符串中
                kwargs={'html_message': "<a href='http://127.0.0.1:8000/active_user/?id=%s'>点击激活gogogo</a>" % id}
            )
            t.start()
    

    check_result.py:

    from celery.result import AsyncResult
    from celery_task.celery import app
    
    def check_result(task_id):
        async1 = AsyncResult(id=task_id, app=app)
    
        if async1.successful():
            result = async1.get()
            print(result)
            return result
            # result.forget() # 将结果删除
            # async.revoke(terminate=True)  # 无论现在是什么时候,都要终止
            # async.revoke(terminate=False) # 如果任务还没有开始执行呢,那么就可以终止。
    	 elif async1.failed():
            print('执行失败')
            return '执行失败'
        elif async1.status == 'PENDING':
            print('任务等待中被执行')
            return '任务等待中被执行'
        elif async1.status == 'RETRY':
            print('任务异常后正在重试')
            return '任务异常后正在重试'
        elif async1.status == 'STARTED':
            print('任务已经开始被执行')
            return '任务已经开始被执行'
    

    执行:

    启用任务执行单元worker(以windows为例):
    
    celery worker -A celery_task -l info  -P  eventlet
    
    app.conf.beat_schdule定时任务时,还需要启用beat,用于定时朝消息队列提交任务:
    
    celery beat -A celery_task -l info
    

    延时任务:

    from celery_app_task import add
    from datetime import datetime
    
    # 方式一
    # v1 = datetime(2019, 2, 13, 18, 19, 56)
    # print(v1)
    # v2 = datetime.utcfromtimestamp(v1.timestamp())
    # print(v2)
    # result = add.apply_async(args=[1, 3], eta=v2)
    # print(result.id)
    
    # 方式二
    ctime = datetime.now()
    # 默认用utc时间
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=10)
    task_time = utc_ctime + time_delay
    
    # 使用apply_async并设定时间,这里是10秒后执行任务
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    

    django + celery:

    from datetime import timedelta
    from celery import Celery 
    from celery.schedules import crontab
    
    cel = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/1', include=[
        'celery_task.tasks1',
        'celery_task.tasks2',
    ])
    cel.conf.timezone = 'Asia/Shanghai'
    cel.conf.enable_utc = False
    
    cel.conf.beat_schedule = {
        # 名字随意命名
        'add-every-10-seconds': {
            # 执行tasks1下的test_celery函数
            'task': 'celery_task.tasks1.test_celery',
            # 每隔2秒执行一次
            # 'schedule': crontab(minute="*/1"),
            'schedule': timedelta(seconds=2),
            # 传递参数
            'args': ('test',)
        },
    }
    
    执行:
        # 启动一个beat
        celery beat -A celery_task -l info
    
        # 启动work执行
        celery worker -A celery_task -l info -P  eventlet
    
    
  • 相关阅读:
    【Codechef】Chef and Bike(二维多项式插值)
    USACO 完结的一些感想
    USACO 6.5 Checker Challenge
    USACO 6.5 The Clocks
    USACO 6.5 Betsy's Tour (插头dp)
    USACO 6.5 Closed Fences
    USACO 6.4 Electric Fences
    USACO 6.5 All Latin Squares
    USACO 6.4 The Primes
    USACO 6.4 Wisconsin Squares
  • 原文地址:https://www.cnblogs.com/shaozheng/p/12177855.html
Copyright © 2011-2022 走看看