zoukankan      html  css  js  c++  java
  • 基于python实现的三方组件----Celery

    一.基于python实现的三方组件----Celery

    1.作用

    用于异步周期任务的处理

    2.Celery的组成

    (1)任务 app
    (2)记录任务的缓存(通常用redis或rabbitMQ)
        任务记录 -broker
        任务返回记录-backend
    (3)Worker 员工
        主动执行任务
        主动反馈结果

    3.celery简单实例

    s1.py

    from celery import Celery
    import time
    
    #使用redis连接url格式 :redis://:password@hostname:port/db_number
    my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼
    
    @my_task.task
    def my_func1(a,b):
        time.sleep(10)
        return a+b
    
    @my_task.task
    def my_func2():
        time.sleep(10)
        return 2
    
    @my_task.task
    def my_func3():
        time.sleep(10)
        return 3

    命令行运行

    Linux:Linux - celery worker -A s1 -l INFO 
    Windows:celery worker -A s1 -l INFO -P eventlet
    #Windows下需要下载eventlet模块模块,否则celery4的版本不支持windows
    #l:日志输出
    #c:数量

    s2.py

    from s1 import my_func1,my_func2,my_func3
    pid=my_func1.delay(10,20)
    print(pid)
    pid=my_func2.delay()
    print(pid)
    pid=my_func3.delay()
    print(pid)

    s3.py

    from celery.result import AsyncResult
    from s1 import my_task
    #运行s2.py得到的pid
    res=AsyncResult(id='2b36d20f-da07-42fe-b203-1e56fbaafd5e',app=my_task)
    if res.successful():
        print(res.get())
    else:
        print("任务正在进行中")

    4.爬虫简单应用

    在caiji.py中

    from flask import Flask,request as requ,jsonify,render_template,send_file
    import pymongo
    import json
    import time
    import  urllib
    import requests
    import re
    from urllib import request
    import uuid
    from celery import Celery
    import time
    
    #使用redis连接url格式 :redis://:password@hostname:port/db_number
    my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中task代表你对任务在内部的称呼
    
    #获取各种分类的歌曲列表
    @my_task.task
    def getcontent():
            # content=requ.form.get("content")
            # print(content)
            headers={"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.92 Safari/537.36"}
            url="https://www.ximalaya.com/ertong/ertongwenxue"
            request=urllib.request.Request(url,headers=headers)
            response=urllib.request.urlopen(request)
    
            response_text=response.read().decode("utf-8")
            title_id_list=re.findall('"album-title line-2 lg.+?" title="(.+?)" href="/ertong/(d+?)/">',response_text)
    
            anthor_list=re.findall('"album-author.+?" title="(.+?)" href',response_text)
            response_list=[]
            i = 0
            for i in range(len(title_id_list)):
                response_dict={}
                response_dict={
                    "title":title_id_list[i][0],
                    "id":title_id_list[i][1],
                    "author":anthor_list[i]
                }
                response_list.append(response_dict)
    
    
            # print("返回",response_list)
            return response_list
    
    
    #获取music的二进制文件
    @my_task.task
    def getmusic(id):
        print(id)
        url="http://m.ximalaya.com/ertong/"+id+"/"
        response=requests.get(url)
        response.encoding="utf-8"
        path=re.findall('"isCopyright":.+?"src":"(.+?)","albumId"',response.text)[0]
        print("res",path)
        d_data = requests.get(path)
        get_str=str(uuid.uuid4())
        print(get_str)
        name="./music/"+get_str + ".mp3"
        with open(name,"wb") as f:
            f.write(d_data.content)
        return send_file(name)
    # getcontent()

    在results.py中

    from caiji import getcontent,getmusic
    res1=getcontent.delay()
    print(res1)
    for i in res1.get():
        res2 = getmusic.delay(i["id"])
        print(res2)

    5.定时任务(十秒钟后执行函数)

    在s4.py中

    from celery import Celery
    import time
    my_task=Celery("task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") #括号中代表你对任务在内部的称呼
    ​
    @my_task.task
    def my_func1(a,b):
        return 1

    在s5.py中

    import datetime
    import time
    from s4 import my_func1
    ​
    tp = time.time()
    utc_time = datetime.datetime.utcfromtimestamp(tp)
    add_time = datetime.timedelta(seconds=10)
    utc_time = utc_time + add_time
    res = my_func1.apply_async(args=(2,3),eta=utc_time)
    print(res)

    6.周期任务

    task_one.py

    from celery import Celery
    import time
    ​
    my_task=Celery("my_task",broker="redis://127.0.0.1:6379",backend="redis://127.0.0.1:6379") 
    @my_task.task
    def my_func1():
        time.sleep(10)
        return "十秒钟执行的"

    task_two.py

    import time
    from task_one import my_task
    @my_task.task
    def my_func2():
        time.sleep(5)
        return "五秒钟执行的"

    s6.py

    from celery import Celery
    from celery.schedules import crontab
    
    celery_task = Celery("task",
                         broker="redis://127.0.0.1:6379",
                         backend="redis://127.0.0.1:6379",
                         include=["task_one","task_two"])
    
    #我要要对beat任务生产做一个配置,这个配置的意思就是每10秒执行一次Celery_task.task_one任务参数是(10,10)
    celery_task.conf.beat_schedule={
        "each10s_task":{
            "task":"task_one.my_func1",
            "schedule":10, # 每10秒钟执行一次
            # "args":(10,20)
        },
        "each5s_task": {
            "task": "task_two.my_func2",
            "schedule":5, # 每5秒
            # "args": (50, 60)
        },
    
    }
    
    # celery beat -A Celery_task
    # celery worker -A Celery_task -l INFO -P eventlet

    celery beat -A Celery_task

    7.celery项目目录

    在selery.py中

    from celery import Celery
    my_task = Celery("task",
                     broker="redis://127.0.0.1:6379",
                     backend="redis://127.0.0.1:6379",
                     include=["Celery_task.task_one","Celery_task.task_two"])

    在task_one.py中

    from Celery_task.celery import my_task
    
    @my_task.task
    def func1():
        return 1
    
    @my_task.task
    def func3():
        return 3

    在task_two.py中

    from Celery_task.celery import my_task
    
    @my_task.task
    def func2():
        return 2

    celery worker -A Celery_task -l INFO -P eventlet

     

     

     

  • 相关阅读:
    webpack 关于跨域的配置
    如何使用css变量
    样式重置
    vue+element_ui上传文件,并传递额外参数(自动上传)
    LeetCode-46-全排列
    LeetCode-39-组合总数
    LeetCode-33-搜索旋转排序数组
    LeetCode-207-课程表
    LeetCode-15-三数之和
    LeetCode-盛最多水的容器
  • 原文地址:https://www.cnblogs.com/shanghongyun/p/10448749.html
Copyright © 2011-2022 走看看