zoukankan      html  css  js  c++  java
  • celery应用

    celery---分布式任务队列

    Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。

    Celery是一个基于python开发的模块,可以帮助我们对任务进行分发和处理;

    img

    1、环境搭建:

    pip3 install celery==4.4
    

    2、Broker选择:

    Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。 作为中间人,我们有几种方案可选择:

    RabbitMQ
    RabbitMQ是一个功能完备,稳定的并且易于安装的broker. 它是生产环境中最优的选择。

    Redis
    Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。 关于是有那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis

    3、创建应用

    • c1.py 文件
    import time
    from celery import Celery
    
    app = Celery("tasks",broker="redis://49.222.54.28:6379",backend="redis://49.222.54.28:6379")
    
    @app.task
    def x1(x,y):
        time.sleep(10)
        return x + y
    
    @app.task
    def x2(x,y):
        time.sleep(10)
        return x - y
    
    • c2.py 文件
    from c1 import x1
    
    result = x1.delay(4,4)
    print(result)
    print(result.id)
    
    • c3.py 文件
    from celery.result import AsyncResult
    from c1 import app
    
    result_object = AsyncResult(id="68cd648b-da09-4fef-9efe-d9e894a6a7ee",app=app)
    data = result_object.get()
    print(result_object.status)
    print(data)
    

    运行代码(windows环境):

    在终端下执行如下命令:

    celery worker -A c1 -l info 
    
    说明:
    -A 后边跟的是任务文件名
    -l 是打印出日志
    
    

    python c2.py
    

    往任务队列中放任务,这时window环境的终端就会报一个错误信息:

    这个错误只有window环境下才会出现,linux不会;这时我们需要安装一个模块:

    pip install eventlet
    

    当我们在启动worker的时候命令就应该这样写:

    celery worker -A c1 -l info -P eventlet
    

    创建完任务会有一个id值:

    通过这个id值我们就能获取到执行完任务的结果:

    4、在django中使用celery

    第一步:【项目/项目/settings.py 】添加配置

    CELERY_BROKER_URL = 'redis://192.168.16.85:6379'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_RESULT_BACKEND = 'redis://192.168.16.85:6379'
    CELERY_TASK_SERIALIZER = 'json'
    

    第二步:【项目/项目/celery.py】在项目同名目录创建 celery.py

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import os
    from celery import Celery
    
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demos.settings')
    
    app = Celery('demos')
    
    # Using a string here means the worker doesn't have to serialize
    # the configuration object to child processes.
    # - namespace='CELERY' means all celery-related configuration keys
    #   should have a `CELERY_` prefix.
    app.config_from_object('django.conf:settings', namespace='CELERY')
    
    # Load task modules from all registered Django app configs.
    # 去每个已注册app中读取 tasks.py 文件
    app.autodiscover_tasks()
    

    第三步,【项目/app名称/tasks.py】:写上具体的任务

    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
    

    第四步,【项目/项目/__init__.py】:启动Django程序加载celery.py文件

    from NewWEApi.celery import app as celery_app
    
    __all__ = ('celery_app',)     
    

    启动worker

    进入项目目录
    
    celery worker -A demos -l info -P eventlet
    
    说明:
    -A 后边加的是项目名称
    -P 后边是windows环境下需要添加的模块
    

    编写视图函数,调用celery去创建任务

    url:

    url(r'^create/tasks/$', task.create_task),
    url(r'^get/result/$', task.get_result),
    

    view:

    from django.shortcuts import HttpResponse
    from celery.result import AsyncResult
    
    from api.tasks import add
    from NewWEApi import celery_app
    
    def create_task(request):
        result = add.delay(1,4)
        return HttpResponse(result.id)
    
    
    def get_result(request):
        nid = request.GET.get("nid")
        result_object = AsyncResult(id=nid,app=celery_app)
        data = result_object.get()
        return HttpResponse(data)
    

    最后启动Django程序

    celery定时任务

    def create_task(request):
        # 获取本地时间
        ctime = datetime.datetime.now()
        # 将本地时间转换成utc时间
        utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
        target_time = utc_time + datetime.timedelta(seconds=10)
        result = add.apply_async(args=[2,5],eta=target_time)
        return HttpResponse(result.id)
    

    移除任务队列中的任务

    result_object.forget()  # 移除任务队列中的任务
    result_object.revoke() 	# 取消任务
    
  • 相关阅读:
    软件工程作业-结对实验
    软件工程实践作业2
    UNIX线程之间的关系
    c中计时的几种方法
    调试器工作原理(3):调试信息
    调试器工作原理(2):实现断点
    调试器工作原理(1):基础篇
    linux的终端,网络虚拟终端,伪终端(转)
    asterisk webrtc使用SIPML5初体验
    初次使用nodejs的问题
  • 原文地址:https://www.cnblogs.com/zhufanyu/p/12268234.html
Copyright © 2011-2022 走看看