zoukankan      html  css  js  c++  java
  • celery-demo

    celeryconfig.py

    rabbitmq link

    from kombu import Queue, Exchange
    
    # 该配置文件基于celery 4.3    http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-settings
    
    broker_url = "amqp://name:passpwd@ip/voiceprint"
    
    timezone = 'Asia/Shanghai' # 设定时间
    enable_utc = True # 使用上面设定时间
    
    task_acks_late = True
    worker_prefetch_multiplier = 1
    #  一次只拿取 一个任务   https://www.v2ex.com/t/343440
    
    include = ["deal.pqueue.tasks_v2", "deal.pqueue.periodic"] # 设置需要导入的模块
    
    # 配置队列
    #   最新的rabbitMQ 支持优先级,https://blog.csdn.net/qq_18863573/article/details/53540090
    task_queues = (
        Queue(name="task_process",exchange=Exchange("task_process"),routing_key="task_process",queue_arguments={"x-max-priority": 10}),
        Queue(name='single',exchange=Exchange('single'),routing_key='single'),
        Queue(name='task_score',exchange=Exchange('task_score'),routing_key='task_score'),
    )
    
    # 可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。
    # 通过celery_routes 来为每一个task指定队列,如果有任务到达时,通过任务的名字来让指定的worker来处理。
    #  路由的任务 就是任务的分发到queues, 路由(哪个任务放入哪个队列),为每一种任务指定队列,
    task_routes = {
        "deal.pqueue.periodic.check_todo": {"queue": "single","routing_key": "single"},
        "deal.pqueue.tasks_v2.whole_fq_process": {"queue": "task_process","routing_key": "task_process"},
        "deal.pqueue.tasks_v2.whole_am_cu_process": {"queue": "task_process", "routing_key": "task_process"},
        "deal.pqueue.tasks_v2.score_with_all_repo": {"queue": "task_score","routing_key": "task_score"}
    }
    
     # beat -B   设定循环执行任务要求, 官方链接 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html
     # demo: https://blog.csdn.net/Shyllin/article/details/80940643
     # https://blog.csdn.net/preyta/article/details/54172961
     # celery -A deal.pqueue -l info beat
    beat_schedule = {
        "process-fq-every-minute": {
            "task": "deal.pqueue.periodic.check_todo",
            "schedule": 60 * 10,
        }
    }
    

    periodic.py

    from os import path
    from .celery import app
    
    from deal.common.sql import session_scope
    from deal.common.models import RecordFile as RF
    from deal.common.sql_v2 import session_scope
    
    from .tasks import fq_batch_whole_process, am_cu_single_process
    from .tasks_v2 import (whole_fq_process, score_with_all_repo,whole_am_cu_process,)
    
    # htaudiotype  #0:单侧录音。1:混合录音
    def check_fq(htaudiotype, fn): # 0, whole_fq_process
        with session_scope() as s:
            query = s.query(RF.downloadtodir, RF.filename, RF.telnumber)
                        .filter(RF.person_type==1)
                        .filter(RF.process_state==10)
                        .filter(RF.downloaded==1)
                        .filter(RF.htaudiotype==htaudiotype)
                        .filter(RF.downloadtodir != '').all()
            for q in query: # 查询待处理文件 然后进行处理
                fp = path.join(q.downloadtodir, q.filename)
                kwargs = dict(fp=fp, tel=q.telnumber) #
                fn.apply_async(kwargs=kwargs, queue="task_process",link=score_with_all_repo.s()) #  指定参数,tuple, kwargs, 将该任务从任务路由到任务队列中,link 当前任务完成成功后要接下来做的任务,
                # mark the processing state
                s.query(RF).filter(RF.filename==q.filename).update({'process_state': -1}) #  对于反欺诈的数据
    '''
    查询数据库看那些文件还未处理,然后将这些文件放到broker中,并且将process_state设定为-1,文件完成跟全库打分的操作。
    '''
    
    def check_am_cu():
    
        # For AM
        with session_scope() as s:
            query = s.query(RF.transcodefiletodir, RF.filename, RF.telnumber) 
                        .filter(RF.person_type==2) 
                        .filter(RF.process_state==0) 
                        .filter(RF.transcodefiletodir != '') 
                        .filter(RF.checkflag==1) 
                        .filter(RF.telnumber!='') 
                        .filter(RF.registerflag==0).all()
            
            items = [{'fp': path.join(i.transcodefiletodir, i.filename),'tel': i.telnumber, 'str_type': 'am'} for i in query]
            for i in items:
                # representing it's extracting xvector
                filename = path.basename(i['fp'])
                s.query(RF).filter(RF.filename==filename).update({'registerflag': -1})
                whole_am_cu_process.apply_async(kwargs=i, queue="task_process",link=score_with_all_repo.s(),priority =2 )
                whole_am_cu_process.delay()
        
        # For CU
        with session_scope() as s:
            query = s.query(RF.transcodefiletodir, RF.filename, RF.telnumber) 
                        .filter(RF.person_type==4) 
                        .filter(RF.process_state==0) 
                        .filter(RF.transcodefiletodir != '') 
                        .filter(RF.piecesid != '') 
                        .filter(RF.telnumber != '') 
                        .filter(RF.registerflag==0).all()
            
            items = [{'fp': path.join(i.transcodefiletodir, i.filename),'tel': i.telnumber, 'str_type': 'cu'} for i in query]
            for i in items:
                # representing it's extracting xvector
                filename = path.basename(i['fp'])
                s.query(RF).filter(RF.filename==filename).update({'registerflag': -1})
                whole_am_cu_process.apply_async( kwargs=i, queue="task_process",link=score_with_all_repo.s())
    
    
    @app.task
    def check_todo():
        # For FQ
        check_fq(0, whole_fq_process) #
        # check_fq(1, whole_mix_fq_process)
        check_am_cu()
    

    tasks_v2.py

    from os import path
    from .celery import app
    from deal.audio.manager_v2 import MixFQDiaManager, FQDiaManager, AMCUDiaManager
    from deal.repos.match import Match
    
    
    @app.task
    def whole_fq_process(fp, tel): # 
        """ All in one task version for FQ """
        audio = FQAudio(fp, tel=tel)
        audio.pre_process() 
        if not audio.valid:
            return 'Pre_Failed'
        manager = FQDiaManager(audio.fp, tel)
        manager.dia() # 对文件进行dia
        if manager.err_msg: # 保存怎么办
            return manager.err_msg  # 文件报警信息
        utts = manager.score_with_ringtone()
        if manager.err_msg:
            return manager.err_msg
        return [i.split('-')[1] for i in utts] # 返回文件列表, fileid 应该是
    
    
    # @app.task
    def whole_mix_fq_process(fp, tel): # 同上面
        """All-in-one"""
        audio = FQAudio(fp, tel)
        audio.pre_process4mix()
        if not audio.valid:
            return 'Pre_Failed'
        manager = MixFQDiaManager(audio.fp, tel)
        manager.dia()
        if manager.err_msg:
            return manager.err_msg
    
        utts = manager.score_with_ringtone()
        if manager.err_msg:
            return manager.err_msg
    
        return [i.split('-')[1] for i in utts]
        
        # if utts exist, score with all repository
    
    
    @app.task
    def whole_am_cu_process(fp, tel, str_type): # str_type  =  am    cu
        manager = AMCUDiaManager(fp, tel, str_type)
        utts = manager.dia()
        if manager.err_msg:
            return manager.err_msg
        
        return [i.split('-')[1] for i in utts]  # 可用的  tel-fileid
    
    
    @app.task
    def score_with_all_repo(fileids): 
        if not type(fileids) is list:
            return
        m = Match(fileids, ['ALLIB'])
        m.parse_scores_for_alarm()
    
    

    简单的抽离了一个工作demo,模板 笔记

    关注公众号 海量干货等你
  • 相关阅读:
    磁盘IO工作机制
    java 的IO类库的基本架构
    诡异的NPE--三目运算自动类型转换
    WIN2008服务器不能复制粘贴怎么办
    用nrm一键切换npm源
    Linux常用命令大全(非常全!!!)
    整理 node-sass 安装失败的原因及解决办法
    win10完美去除小箭头
    JS中slice,splice,split的区别
    Win10环境下Redis和Redis desktop manager 安装
  • 原文地址:https://www.cnblogs.com/sowhat1412/p/12734294.html
Copyright © 2011-2022 走看看