zoukankan      html  css  js  c++  java
  • celery异步消息队列的使用

    1、准备工作

    1.1 流程图

     2、环境安装

      2.1、在Ubuntu中需要安装redis

    安装redis
    $sudo apt-get update
    $sudo apt-get install redis-server
    启动redis
    $redis-server
    连接redis
    $redis-cli 
    $redis-cli -h ip -a 6379 
    安装Python操作redis的包
    pip install redis 
    重启redis
    sudo service reids restart 
    安装redis

    redis默认绑定的ip为127.0.0.1其他电脑无法访问Ubuntu的redis

     重启redis服务 service redis restart

    查看绑定端口

     在wind上telnet ip  6379 成功说明成功

    2.2、安装celery

    pip install celery

    2、开始使用celery

    2、1基本应用

    在/home/zbwu103/celery 文件中创建一个tasks.py的任务文件

    #task.py 
    
    from celery import Celery
     
    app = Celery('tasks',
                 broker='redis://192.168.1.111',
                 backend='redis://192.168.1.111'
                 #redis://密码@ip
    )
     
    @app.task
    def add(x,y):
        print("running...",x,y)
        return x+y        
    View Code

    在home/zbwu103/celery的目录启动监听任务

    #打印日志的模式运行
    celery -A tasks worker --loglevel=info

    在开一个终端,到/home/zbwu103/celery用Python进入命令行运行

    from tasks import add 
    t = add.delay(4,5)
    
    #t.result.ready() 查看任务是否完成,完成返回True,未完成返回False
    #t.get()  返回完成之后的结果
    #t.task_id  返回任务的唯一ID号,可以通过ID查询到任务

    上面任务都是在终端上运行,如果终端关闭tasks也会终止。

    所以需要任务在后台运行

    celery multi stop w1 停止 w1

    2.2 、在项目中如何使用celery

     

    from __future__ import absolute_import, unicode_literals
    from celery import Celery
    
    app = Celery('my_proj',
                 broker='redis://192.168.1.111',
                 backend='redis://192.168.1.111',
                 include=['myp_roj.tasks'])
    
    app.conf.update(
        result_expires=3600,
    )
    if __name__ == '__main__':
        app.start()
    celery
    from __future__ import absolute_import, unicode_literals
    import subprocess
    from .celery import app
    
    @app.task
    def add(x,y):
        return x+y
    
    @app.task
    def run_cmd(cmd):
        obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        return obj.stdout.read().decode('utf-8')
    tasks.py

    在my_proj同级目录启动

     查看任务启动情况

    ps -ef |grep celery

    2.3 、celery 定时任务

    celery使用beat来执行celert beat 来实现定时任务

    worker定时任务

    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('task',
                 broker='redis://192.168.1.111',
                 backend='redis://192.168.1.111')
    
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # Calls test('hello') every 10 seconds.
        sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
    
        # Calls test('world') every 30 seconds
        sender.add_periodic_task(30.0, test.s('world'), expires=10)
    
        # Executes every Monday morning at 7:30 a.m.
        sender.add_periodic_task(
            crontab(hour=21, minute=26, day_of_week='Sum'),
            test.s('Happy Mondays!'),
        )
    
    
    @app.task
    def test(arg):
        print('runing test.....')
        print(arg)
    periodic_task.py

    启动定时任务

    celery -A periodic_task worker

    另外开一个任务调度区不断的检测你的任务计划

    celery -A periodic_task beat

    2.4、celery和django配置一起使用

    在setting同级的目录中新建一个celery.py的文件配置celery基本的配置

    from __future__ import absolute_import, unicode_literals
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryTest.settings')
    
    app = Celery('CeleryTest')
    
    # Using a string here means the worker don't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    app.autodiscover_tasks()
    
    
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))
    celery.py

    在setting.py同级的目录配置__init__.py

    from __future__ import absolute_import, unicode_literals
     
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app
     
    __all__ = ['celery_app']
    __init__.py

    在APP的目录里面新建一个tasks.py的任务来填写任务

    #app01/tasks.py
    # Create your tasks here
    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    import time
    
    @shared_task
    def add(x, y):
        print("running task add,我是windows ")
        time.sleep(1)
        return x + y
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    tasks.py

    从views中调用任务

    /app01/view.py
    from django.shortcuts import render,HttpResponse
    from  app01 import tasks
    from celery.result import AsyncResult
    
    def index(request):
    
        res = tasks.add.delay(5,999)
    
        print("res:",res)
        print(res.status)
        # import pdb
        # pdb.set_trace()
        return HttpResponse(res.task_id)
    
    
    def task_res(request):
        #通过ID获取结果
        result = AsyncResult(id="be4933c0-ed9b-4a04-ade8-79f4c57cfc74")
    
        #return HttpResponse(result.get())
        return HttpResponse(result.status)
    views.py

     2.5、在django中使用定时的任务

    在Ubuntu中安装django-celery-beat插件

    pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple django-celery-beat

    在setting中增加APP

    ’‘django_celery_beat",

    使用Python manage.py migrate 生成表结构

    直接可以在django的admin配置任务

    之后在Ubuntu中启动celery worker

    celery -A CeleryTest worker -l info 

    启动调度器来监控定时任务

    celery -A CeleryTest beat -l info -S django

    注意,经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到

    人生苦短,我用cnblog
  • 相关阅读:
    实验三:UML 建模工具的安装与使用
    结对编程 第二阶段
    实验二:结对编程 第一阶段
    结对编程之github使用自己的仓库
    软工 实验一 Git代码版本管理
    第七次作业
    第5次作业
    第四次作业
    第三次作业
    第二次作业
  • 原文地址:https://www.cnblogs.com/wuzhibinsuib/p/12953645.html
Copyright © 2011-2022 走看看